Lambda 事件源映射 - AWS Lambda

Lambda 事件源映射

事件源映射是一个从事件源读取并调用 Lambda 函数的 Lambda 资源。您可以使用事件源映射来处理未直接调用 Lambda 函数的服务中的流或队列中的项。Lambda 为以下服务提供事件源映射。

事件源映射使用函数执行角色中的权限来读取和管理事件源中的项。权限、事件结构、设置和轮询行为因事件源而异。有关更多信息,请参阅用作事件源的服务的链接主题。

要使用 AWS Command Line Interface(AWS CLI)AWS SDK 来管理事件源,可以使用以下 API 操作:

以下示例使用 AWS CLI 将一个名为 my-function 的函数映射到由其 Amazon Resource Name(ARN)指定的 DynamoDB 流,其中批处理大小为 500。

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525

您应看到以下输出:

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

批处理行为

事件源映射从目标事件源读取项目。预设情况下,事件源映射会将记录合并为单个有效负载进行批处理,并由 Lambda 将其发送到您的函数。您可以配置批处理时段 (MaximumBatchingWindowInSeconds) 和批处理大小 (BatchSize) 来优化批处理行为。批处理时段是将记录收集到单个有效负载中的最长时间。批处理大小是单个批处理中的最大记录数。满足以下三个条件中的任意一个时,Lambda 会调用您的函数:

  • 批处理时段达到其最大值。批处理时段行为因特定的事件源而异。

    • 对于 Kinesis、DynamoDB 和 Amazon SQS 事件源:原定设置的批处理时段是 0 秒。这意味着 Lambda 会尽快向您的函数发送批处理。如果您配置了 MaximumBatchingWindowInSeconds,则下一个批处理时段将在上一个函数调用完成后立即开始计算。

    • 对于 Amazon MSK、自行管理的 Apache Kafka 和 Amazon MQ 事件源:原定设置的批处理时段为 500 毫秒。您可以将 MaximumBatchingWindowInSeconds 配置为介于 0 秒到 300 秒之间的任意值,以秒的整数倍调整。第一条记录到达后,批处理时段将立即开始计算。

      注意

      由于您只能以秒的整数倍调整 MaximumBatchingWindowInSeconds,您无法在更改该值后恢复到 500 毫秒的原定设置批处理时段。要恢复原定设置的批处理时段,必须创建新的事件源映射。

  • 达到批处理大小。最小批处理大小为 1。原定设置和最大批处理大小取决于事件源。有关这些值的详细信息,请参阅 CreateEventSourceMapping API 操作的 BatchSize 规范。

  • 有效负载大小达到 6MB您不能修改此限制。

下图演示了这三个条件。假设批处理时段从 t = 7 秒开始。在第一种场景中,批处理时段累积 5 条记录后在 t = 47 秒达到 40 秒的最大值。在第二种场景中,批处理大小在批处理时段到期之前达到 10,因此批处理时段会提前结束。在第三种场景中,在批处理时段到期之前达到最大有效负载大小,因此批处理时段会提前结束。


        当满足以下三个条件中的任何一个时,批处理时段将到期:批处理时段达到最大值、达到批处理大小或有效负载大小达到 6MB。

以下示例显示了从 Kinesis 流读取的事件源映射。如果一批事件的所有处理尝试失败,则事件源映射将有关该批次的详细信息发送到 SQS 队列。


        从 Kinesis 流读取的事件源映射。它在本地将记录排队后才将记录发送给函数。

事件批次是 Lambda 发送到函数的事件。它是由事件源映射在当前批处理时段到期之前读取的项目组成的一批记录或消息。

对于流,事件源映射为流中的每个分片创建迭代器,并按顺序处理每个分片中的项。您可以将事件源映射配置为只读取流中显示的新项,或者从较旧的项开始。已处理的项目不会从流中删除,并且可以由其他函数或使用者处理。

预设情况下,如果函数返回错误,事件源映射会重新处理整个批处理,直到函数成功,或直到批处理中的项目到期。为确保按顺序处理,在错误得到解决之前,事件源映射会暂停处理受影响的分片。您可以将事件源映射配置为放弃旧事件、限制重试次数或并行处理多个批次。如果并行处理多个批处理,仍然保证每个分区键按顺序处理,但事件源映射会同时处理同一分片中的多个分区键。

您还可以将事件源映射配置为在放弃某个事件批次时向其他服务发送调用记录。Lambda 支持以下事件源映射的目标

  • Amazon SQS – SQS 队列。

  • Amazon SNS – SNS 主题。

调用记录包含 JSON 格式的失败事件批次的详细信息。

以下示例显示了 Kinesis 流的调用记录。

例 调用记录

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

Lambda 还支持 FIFO(先进先出)队列的有序处理,可纵向扩展到活动消息组的数量。对于标准队列,不一定按顺序处理项。Lambda 向上扩展以尽可能快地处理标准队列。出现错误时,Lambda 会将批处理作为单个项退回队列,并且可在与原始批处理不同的分组中处理。有时,即使没有发生任何函数错误,事件源映射也可能会从队列中接收相同的项两次。Lambda 在成功处理项后将其从队列中删除。如果 Lambda 无法处理项目,您可以配置源队列以将项目发送到死信队列。

有关直接调用 Lambda 函数的服务的信息,请参阅 将 AWS Lambda 与其他服务一起使用