将 AWS Lambda 与 Amazon Kinesis 结合使用 - AWS Lambda

将 AWS Lambda 与 Amazon Kinesis 结合使用

您可以使用 AWS Lambda 函数来处理 Amazon Kinesis 数据流中的记录。

Kinesis 数据流是一组分片。每个分片包含一系列数据记录。使用者 是一种处理 Kinesis 数据流中的数据的应用程序。您可以将 Lambda 函数映射到共享吞吐量使用者(标准迭代器)或具有增强扇出功能的专用吞吐量使用者。

对于标准迭代器,Lambda 使用 HTTP 协议轮询 Kinesis 流中的每个分片以查找记录。事件源映射与分片的其他使用者共享读取吞吐量。

为了最大限度地减少延迟并最大限度地提高读取吞吐量,您可以创建具有增强扇出功能的数据流使用者。流使用者将获得与每个分片的专用连接,这不会影响从流中读取信息的其他应用程序。如果您有许多应用程序读取相同的数据,或者您正在重新处理具有大记录的流,则专用吞吐量可以提供帮助。Kinesis 通过 HTTP/2 将数据推送到 Lambda。

有关 Kinesis 数据流的详细信息,请参阅读取 Amazon Kinesis Data Streams 中的数据

注意

错误处理不适用于 HTTP/2 流使用者。

Lambda 从数据流中读取记录,并使用包含流记录的事件同步调用您的函数。Lambda 以批量方式读取记录并调用您的函数来处理批次中的记录。

例 Kinesis 记录事件

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

默认情况下,只要流中有记录,Lambda 就会调用您的函数。如果 Lambda 从流中读取的批处理中只有一条记录,则 Lambda 只会向该函数发送一条记录。为避免在记录数量较少的情况下调用该函数,您可以配置批处理时段,让事件源缓冲最多 5 分钟的记录。在调用该函数之前,Lambda 会继续从流中读取记录,直到收集了完整批次,或者直到批处理时段到期。

如果您的函数返回一个错误,则 Lambda 将重试批处理,直到处理成功或数据过期。为避免分片停滞,可以将事件源映射配置为以较小的批处理大小重试,限制重试次数或者丢弃太早的记录。要保留丢弃的事件,可以配置事件源映射,以将有关失败批处理的详细信息发送到 SQS 队列或 SNS 主题。

您还可以通过并行处理每个分片的多个批处理来提高并发性。在每个分片中,Lambda 最多可以同时处理 10 个批处理。如果您增加每个分片的并发批处理数量,则 Lambda 仍然需要确保在分区键级别进行有序处理。

配置您的数据流和函数

您的 Lambda 函数是数据流的用户应用程序。对于每个分片,它一次处理一批记录。您可以将 Lambda 函数映射到数据流(标准迭代器),或映射到流的使用者(增强型扇出功能)。

对于标准迭代器,Lambda 将针对记录轮询 Kinesis 流中的每个分片(按照每秒一次的基本频率)。当有更多记录可用时,Lambda 会继续进行批处理,直到函数赶上流的速度。事件源映射与分片的其他使用者共享读取吞吐量。

为了最大限度地减少延迟并最大限度地提高读取吞吐量,请创建具有增强扇出功能的数据流使用者。增强扇出功能使用者将获得与每个分片的专用连接,这不会影响从流中读取信息的其他应用程序。流使用者使用 HTTP/2 通过长期连接将记录推送到 Lambda 并压缩请求头来减少延迟。您可以使用 Kinesis RegisterStreamConsumer API 创建流使用者。

$ aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream { "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

要提高函数处理记录的速度,请将分片添加到数据流中。Lambda 会按顺序处理每个分片中的记录。如果您的函数返回错误,它会停止处理分片中的其他记录。使用更多分片,可以同时处理更多批次,从而降低错误对并发性的影响。

如果您的函数无法扩展以处理并发批处理的总数,请为您的函数请求提高配额预留并发

执行角色权限

Lambda 需要以下权限才能管理与您的 Kinesis 数据流相关的资源。将它们添加到您的函数的 执行角色

AWSLambdaKinesisExecutionRole 托管策略包含这些权限。有关更多信息,请参阅 AWS Lambda 执行角色

要将失败批处理的记录发送到队列或主题,您的函数需要其他权限。每项目标服务均需要不同的权限,如下所示:

将流配置为事件源

创建事件源映射以指示 Lambda 将数据流中的记录发送到 Lambda 函数。您可以创建多个事件源映射,以使用多个 Lambda 函数处理相同的数据,或使用单个函数处理来自多个数据流的项目。

要在 Lambda 控制台中将您的函数配置为从 Kinesis 读取,请创建 Kinesis 触发器。

创建触发器

  1. 打开 Lambda 控制台 函数页面

  2. 选择函数。

  3. Designer 下方,选择 Add trigger (添加触发器)

  4. 选择触发器类型。

  5. 配置所需选项,然后选择 Add (添加)

Lambda 支持 Kinesis 事件源的以下选项。

事件源选项

  • Kinesis 流 – 从中读取记录的 Kinesis 流。

  • 使用者(可选)– 使用流使用者通过专用连接从流中读取。

  • Batch size (批处理大小) – 每个批处理中发送到函数的记录的数量(最多 10000 条)。Lambda 通过单个调用将批处理中的所有记录传递给函数,前提是事件的总大小未超出同步调用的负载限制 (6 MB)。

  • 批处理时段 – 指定在调用函数之前收集记录的最长时间(以秒为单位)。

  • 开始位置 – 仅处理新记录、所有现有记录或在特定日期之后创建的记录。

    • 最新 – 处理已添加到流中的新记录。

    • 时间范围 – 处理流中的所有记录。

    • 位于时间戳 – 处理从特定时间开始的记录。

    在处理任何现有记录后,函数将继续处理新记录。

  • On-failure destination (故障目标) – SQS 队列或 SNS 主题,用于无法处理的记录。当 Lambda 由于时间太远或已用尽所有重试而丢弃一批记录时,它将有关该批处理的详细信息发送到队列或主题。

  • Retry attempts (重试) – 函数返回错误时 Lambda 重试的最大次数。这不适用于批处理未到达该函数的服务错误或限制。

  • Maximum age of record (最长记录期限) – Lambda 发送到函数的记录的最长期限。

  • Split batch on error (出错时拆分批处理) – 当函数返回错误时,请在重试之前将批处理拆分为两部分。

  • Concurrent batches per shard (每个分片的并发批处理) – 并发处理来自同一分片的多个批处理。

  • 已启用 – 设置为 true 可启用事件源映射。设置为 false 可停止处理记录。Lambda 将跟踪已处理的最后一条记录,并在重新启用后从停止位置重新开始处理。

注意

Kinesis 按每个分片收费;对于增强型扇出功能,从流中读取数据。有关定价的详细信息,请参阅 Amazon Kinesis 定价

之后,要管理事件源配置,请在设计器中选择触发器。

事件源映射 API

要使用 AWS CLI 或 AWS 开发工具包管理事件源映射,请使用以下 API 操作:

要使用 AWS CLI 创建事件源映射,请使用 create-event-source-mapping 命令。以下示例使用 AWS CLI 将名为 my-function 的函数映射到 Kinesis 数据流。数据流由 Amazon 资源名称 (ARN) 指定,批处理大小为 500,从以 Unix 时间表示的时间戳开始。

$ aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream { "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

要使用一个使用者,请指定使用者的 ARN 而不是流的 ARN。

配置其他选项,以自定义如何处理批处理,并指定何时丢弃无法处理的记录。以下示例更新事件源映射,以在两次重试之后或者如果失败记录已存在一个小时以上,将失败记录发送到 SQS 队列。

$ aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}' { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

更新的设置是异步应用的,并且直到该过程完成才反映在输出中。使用 get-event-source-mapping 命令可查看当前状态。

$ aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

要同时处理多个批处理,请使用 --parallelization-factor 选项。

$ aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

错误处理

从 Kinesis 流中读取记录的事件源映射将同步调用函数并在出错时重试。如果函数受到限制,或者 Lambda 服务未调用该函数而返回错误,Lambda 将重试,直到记录到期或者超过您在事件源映射上配置的最长期限。

如果函数接收到记录但返回错误,Lambda 将重试,直到批处理中的记录到期、超过最大使用期限或者达到配置的重试配额。对于函数错误,您还可以配置事件源映射,以将失败的批处理拆分为两个批处理。重试较小的批处理可以隔离不良记录并解决超时问题。拆分批处理不计入重试配额。

如果错误处理措施失败,Lambda 将丢弃记录并继续处理流中的批处理。使用默认设置,这意味着不良记录最多可以将针对受影响分片的处理操作阻止one week。为避免这种情况,请以合理的重试次数和适合您使用案例的最长记录期限来配置函数的事件源映射。

要保留废弃批处理的记录,请配置失败事件目标。Lambda 将文档和有关批处理的详细信息发送到目标队列或主题。

配置失败事件记录的目标

  1. 打开 Lambda 控制台 函数页面

  2. 选择函数。

  3. Designer (设计器) 下,选择 Add destination (添加目标)

  4. 对于 Source (源),选择 Stream invocation (流调用)

  5. 对于 Stream (流),选择映射到函数的流。

  6. 对于 Destination type (目标类型),请选择接收调用记录的资源类型。

  7. 对于 Destination (目标),请选择一个资源。

  8. 选择保存

以下示例显示了 Kinesis 流的调用记录。

例 调用记录

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

您可以使用此信息从流中检索受影响的记录以进行故障排除。实际的记录不包括在内,因此您必须处理这些记录,并在它们到期并丢失之前从流中检索它们。

Amazon CloudWatch 指标

在您的函数处理完一批记录后,Lambda 将发出 IteratorAge 指标。该指标指示处理完成时批处理中最后一条记录的时间。如果您的函数正在处理新事件,则可使用迭代器期限来估算新记录的添加时间与函数处理新记录的时间之间的延迟。

迭代器期限中的上升趋势可以指示您的函数问题。有关更多信息,请参阅使用 AWS Lambda 函数指标