将 Lambda 与 Amazon SQS 结合使用 - AWS Lambda

将 Lambda 与 Amazon SQS 结合使用

您可以使用 Lambda 函数来处理某个 Amazon Simple Queue Service(Amazon SQS)队列中的消息。Lambda 事件源映射支持标准队列先进先出 (FIFO) 队列。在 Amazon SQS 中,您可以通过将来自一个应用程序组件的任务发送到一个队列中并异步处理它们来进行分载。

Lambda 轮询队列并同步调用您的 Lambda 函数,其中有包含队列消息的事件。Lambda 按批次读取消息,并为每个批次调用一次函数。当您的函数成功处理一个批次后,Lambda 就会将其消息从队列中删除。以下示例显示了包含两条消息的批次事件。

例 Amazon SQS 消息事件(标准队列)

{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

默认情况下,Lambda 将一次性轮询队列中最多 10 条消息,并将该批次发送到函数。为避免在记录数量较少的情况下调用该函数,您可以配置批处理时段,让事件源缓冲最多五分钟的记录。在调用函数之前,Lambda 将继续轮询 SQS 标准队列中的消息,直到批处理时段到期、达到调用有效负载大小配额或达到配置的最大批处理大小为止。

注意

如果您使用的是批处理窗口,并且 SQS 队列包含的流量非常低,Lambda 可能会等待最多 20 秒钟才能调用您的函数。即使您将批处理窗口设置为低于 20 秒,情况依然如此。

对于 FIFO 队列,记录包含与重复数据消除和顺序相关的其他属性。

例 Amazon SQS 消息事件(FIFO 队列)

{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

当 Lambda 读取某个批处理时,消息将保留在队列中,但会根据队列的可见性超时长度隐藏。如果您的函数成功处理一个批次,Lambda 会将其消息从队列中删除。预设情况下,如果您的函数在处理某个批处理时遇到错误,则该批处理中的所有消息都会在队列中重新可见。因此,函数代码必须能够多次处理同一条消息,而不会产生意外的副作用。您可以通过在函数响应中包括批处理项目失败次数来修改此再处理行为。

扩展和处理

对于标准队列,Lambda 使用长轮询来轮询一个队列,直到它变为活动状态。当消息可用时,Lambda 最多可读取五个批处理并将其发送到您的函数。如果仍有消息可用,则 Lambda 增加批量读取的进程数,最多每分钟增加 60 个实例。事件源映射可以同时处理的最大批处理数量为 1,000。

对于 FIFO 队列,Lambda 按照接收消息的顺序向函数发送消息。向 FIFO 队列发送消息时,需要指定消息组 ID。Amazon SQS 确保同一组中的消息按顺序传递到 Lambda。Lambda 将消息分组,并且每次只为一组发送一个批次。如果函数返回错误,函数会对受影响的消息尝试所有重试,然后 Lambda 才会接收来自同一个组的其他消息。

您的函数可以在并发范围内扩展到活动消息组的数量。有关更多信息,请参阅 AWS 计算博客上的作为事件源的 SQS FIFO

配置队列以便用于 Lambda

创建一个 SQS 队列,用作您的 Lambda 函数事件源。然后将队列配置为可使您的 Lambda 函数有时间处理每批事件,并使 Lambda 在扩展时出现限制错误时能够重试。

为使您的函数有时间处理每批记录,请将源队列的可见性超时至少设置为您在函数上配置的超时的六倍。这一额外的时间有利于 Lambda 在您的函数处理之前的批处理期间遇到限流时进行重试。

如果函数多次都未能处理某条消息,则 Amazon SQS 可以将其发送到某个死信队列。当您的函数返回错误时,Lambda 将其留在队列中。在发生可见性超时之后,Lambda 重新接收消息。要在多次接收之后将消息发送到第二个队列,请在源队列上配置死信队列。

注意

确保在源队列上配置死信队列,而不是在 Lambda 函数上配置。您在函数上配置的死信队列用于函数的异步调用队列,而不是用于事件源队列。

如果您的函数返回错误,或者由于处于最大并发而无法调用,则处理可能会成功,但需要额外尝试。要在将消息发送到死信队列之前给予更好的处理机会,请将源队列重新驱动策略的 maxReceiveCount 设置为至少 5

执行角色权限

Lambda 需要以下权限来管理 Amazon SQS 队列中的消息。将这些权限添加到您的函数的执行角色中。

将队列配置为事件源

创建事件源映射以指示 Lambda 将队列中的项目发送到 Lambda 函数。您可以创建多个事件源映射,以使用单个函数处理来自多个队列的项目。当 Lambda 调用目标函数时,事件可以包含多个项目(最多为可配置的最大批处理大小)。

要在 Lambda 控制台中将您的函数配置为从 Amazon SQS 读取,请创建 SQS 触发器。

创建触发器

  1. 打开 Lamba 控制台的 Functions(函数)页面

  2. 选择一个函数的名称。

  3. Function overview (函数概览) 下,选择 Add trigger (添加触发器)

  4. 选择 SQS 触发器类型。

  5. 配置必填选项,然后选择 Add(添加)。

Lambda 支持 Amazon SQS 事件源的以下选项。

事件源选项

  • SQS queue(SQS 队列)– 要从其读取记录的 Amazon SQS 队列。

  • Batch size(批处理大小)– 每个批次中要发送到函数的记录数。对于标准队列,这最高可为 10,000 条记录。对于 FIFO 队列,最大值为 10。对于批次大小超过 10,还必须将 MaximumBatchingWindowInSeconds 参数设置为至少 1 秒。Lambda 通过单个调用将批处理中的所有记录传递给函数,前提是事件的总大小未超出同步调用的调用有效负载大小配额(6 MB)。

    Lambda 和 Amazon SQS 都会为每条记录生成元数据。这一额外的元数据将会计入总有效负载大小,并且可能导致批处理中发送的记录总数低于配置的批处理大小。Amazon SQS 发送的元数据字段的长度是可变的。有关 Amazon SQS 元数据字段的更多信息,请参阅《Amazon Simple Queue Service API 参考》中的 ReceiveMessage API 操作文档。

  • Batch window(批处理时段)– 在调用函数之前收集记录的最长时间(以秒为单位)。它仅适用于标准队列。

    如果您使用的批处理时段大于 0 秒,则必须考虑队列可见性超时中增加的处理时间。我们建议将队列可见性超时设置为函数超时的六倍加上 MaximumBatchingWindowInSeconds 的值。这使 Lambda 函数有时间处理每个批次的事件,并在出现节流错误时重试。

    注意

    如果您的批处理时段大于 0,并且 (batch window) + (function timeout) > (queue visibility timeout),则您的有效队列可见性超时为 (batch window) + (function timeout) + 30s

    Lambda 一次最多处理五个批处理。这意味着任何时候最多有五个工件可供批处理使用和并行处理消息。每个工件会为其当前批处理的消息显示不同的 Lambda 调用。

  • Enabled(已启用)– 事件源映射的状态。设置为 true 以启用事件源映射。设置为 false 可停止处理记录。

注意

Amazon SQS 为一定量的请求提供了永久性的免费套餐。超出免费套餐后,Amazon SQS 按每百万个请求收取费用。当您的事件源映射处于活动状态时,Lambda 会向队列发出请求以获取项目。有关定价详细信息,请参阅 Amazon SQS 定价

之后要管理事件源配置,请打开 Lambda 控制台,并在设计器中选择 SQS 触发器。

配置您的函数超时,以允许有足够的时间来处理整个批次的项目。如果项目处理需要很长时间,请选择一个较小的批处理大小。大批量处理可以提高非常快速或拥有大量开销的工作负载的效率。但是,如果您的函数返回错误,则批处理中的所有项目都将返回到队列中。如果您在函数上配置了预留并发,请将最小并发执行数设置为 5,以降低在 Lambda 调用函数时出现节流错误的几率。要消除节流错误的几率,请将预留并发值设置为 1,000,这是 Amazon SQS 事件源的最大并发执行次数。

事件源映射 API

要使用 AWS Command Line Interface (AWS CLI)AWS SDK 来管理事件源,可以使用以下 API 操作:

以下示例使用 AWS CLI 将名为 my-function 的函数映射到由 Amazon Resource Name (ARN) 指定的 Amazon SQS 队列,批处理大小为 5,批处理窗口为 60 秒。

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue

您应看到以下输出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }

报告批处理项目失败

预设情况下,如果您的 Lambda 函数在处理某个批处理时遇到错误,则该批处理中的所有消息都会在队列中重新可见,包括 Lambda 已经成功处理的消息。因此,您的函数最终可能会多次处理同一消息。

为避免重新处理失败的批处理中的所有消息,您可以将事件源映射配置为仅使失败的消息重新可见。为此,在配置事件源映射时,请在 FunctionResponseTypes 列表中包含值 ReportBatchItemFailures。这可以让您的函数返回部分成功,从而有助于减少对记录进行不必要的重试次数。

报告语法

在事件源映射配置中包含 ReportBatchItemFailures 后,您可以在函数响应中返回失败的消息 ID 列表。例如,假设一个批处理有五条消息,消息 ID 分别为 id1id2id3id4id5。您的函数成功处理了 id1id3id5。为使消息 id2id4 在队列中重新可见,您的响应语法应与以下类似:

{ "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

要返回批处理中失败的消息 ID 列表,您可以使用 SQSBatchResponse 类对象,也可以创建自己的自定义类。以下是一个使用 SQSBatchResponse 对象的响应示例。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }

要使用此功能,您的函数必须能够从容处理错误。让函数逻辑捕捉所有异常并在函数响应中报告在 batchItemFailures 中导致失败的消息。如果您函数发现了一个异常,则整个批处理将被视为完全失败。

注意

如果您将此功能与 FIFO 队列结合使用,则您的函数应在第一次失败后停止处理消息,并返回 batchItemFailures 中的所有失败和未处理的消息。这有助于保持队列中消息的顺序。

成功和失败的条件

如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全成功:

  • 空的 batchItemFailures 列表

  • Null batchItemFailures 列表

  • 空的 EventResponse

  • Null EventResponse

如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全失败:

  • JSON 响应无效

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

  • 具有某个消息 ID 的 itemIdentifier 值不存在

CloudWatch 指标

要确定函数是否在正确报告批处理项目失败情况,您可以监控 Amazon CloudWatch 中的 NumberOfMessagesDeletedApproximateAgeOfOldestMessage Amazon SQS 指标。

  • NumberOfMessagesDeleted 会跟踪从队列中删除的消息数量。如果该值降至 0,则表明您的函数响应没有正确返回失败的消息。

  • ApproximateAgeOfOldestMessage 会跟踪最早的消息在队列中停留的时间。如果此指标急剧增加,则可能表明您的函数没有正确返回失败的消息。

Amazon SQS 配置参数

所有 Lambda 事件源类型共享相同的 CreateEventSourceMappingUpdateEventSourceMapping API 操作。但是,只有部分参数适用于 Amazon SQS。

适用于 Amazon SQS 的事件源参数
参数 必需 默认值 备注

BatchSize

10

对于标准队列,最大值为 10000。对于 FIFO 队列,最大值为 10。

已启用

true

EventSourceArn

数据流或流使用者的 ARN

FunctionName

FunctionResponseTypes

要使您的函数报告某个批处理中的特定失败,请在 FunctionResponseTypes 中包含值 ReportBatchItemFailures。有关更多信息,请参阅 报告批处理项目失败

MaximumBatchingWindowInSeconds

0