将 Lambda 与 Amazon SQS 结合使用 - AWS Lambda

将 Lambda 与 Amazon SQS 结合使用

注意

如果想要将数据发送到 Lambda 函数以外的目标,或要在发送数据之前丰富数据,请参阅 Amazon EventBridge Pipes(Amazon EventBridge 管道)。

您可以使用 Lambda 函数来处理某个 Amazon Simple Queue Service(Amazon SQS)队列中的消息。Lambda 支持事件源映射标准队列先进先出(FIFO)队列

了解 Amazon SQS 事件源映射的轮询和批处理行为

使用 Amazon SQS 事件源映射,Lambda 会轮询队列并通过事件同步调用您的函数。每个事件可以包含来自队列的一批多条消息。Lambda 每次接收一批这些事件,并为每个批次调用一次您的函数。当您的函数成功处理一个批次后,Lambda 就会将其消息从队列中删除。

当 Lambda 接收某个批次时,消息将保留在队列中,但会根据队列的可见性超时长度隐藏。如果您的函数成功处理了批次中的所有消息,Lambda 会将消息从队列中删除。预设情况下,如果您的函数在处理某个批处理时遇到错误,则该批处理中的所有消息都会在可见性超时过期后在队列中重新可见。因此,函数代码必须能够多次处理同一条消息,而不会产生意外的副作用。

警告

Lambda 事件源映射至少处理每个事件一次,有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题,我们强烈建议您将函数代码设为幂等性。要了解更多信息,请参阅 AWS 知识中心的如何使我的 Lambda 函数具有幂等性

要防止 Lambda 多次处理消息,您可以将事件源映射配置为在函数响应中包含批次项目失败,也可以在 Lambda 函数成功处理消息后使用 DeleteMessage API 将消息从队列中删除。

有关 Lambda 支持的 SQS 事件源映射配置参数的更多信息,请参阅 创建 SQS 事件源映射

示例标准队列消息事件

例 Amazon SQS 消息事件(标准队列)
{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

默认情况下,Lambda 将一次性轮询队列中最多 10 条消息,并将该批次发送到函数。为避免在记录数量较少的情况下调用该函数,您可以配置批次时段,将事件源配置为缓冲最多五分钟的记录。在调用函数之前,Lambda 将继续轮询标准队列中的消息,直到批次时段到期、达到调用有效负载大小配额或达到配置的最大批次大小为止。

如果您使用的是批处理窗口,并且 SQS 队列包含的流量非常低,Lambda 可能会等待最多 20 秒钟才能调用您的函数。即使您将批处理窗口设置为低于 20 秒,情况依然如此。

注意

在 Java 中,反序列化 JSON 时可能会遇到空指针错误。这可能要归因于 JSON 对象映射器转换“Records”和“eventSourceARN”大小写的方式。

示例 FIFO 队列消息事件

对于 FIFO 队列,记录包含与重复数据消除和顺序相关的其他属性。

例 Amazon SQS 消息事件(FIFO 队列)
{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }