在 Lambda 中处理 SQS 事件源错误 - AWS Lambda

在 Lambda 中处理 SQS 事件源错误

为了处理与 SQS 事件源相关的错误,Lambda 会自动使用带有回退策略的重试策略。您还可以通过配置 SQS 事件源映射来返回部分批次响应,从而自定义错误处理行为。

失败调用的回退策略

如果调用失败,Lambda 会在实施回退策略时尝试重试调用。回退策略略有不同,具体取决于 Lambda 的故障原因是函数代码中的错误还是节流所致。

  • 如果您的函数代码导致了该错误,Lambda 将停止处理并重试调用。同时,Lambda 会逐渐退出,以减少分配给 Amazon SQS 事件源映射的并发量。队列的可见性超时结束后,消息将再次显示在队列中。

  • 如果调用失败是节流造成的,Lambda 会通过减少分配给 Amazon SQS 事件源映射的并发量来逐渐停止重试。Lambda 会继续重试该消息,直到消息的时间戳超过队列的可见性超时,此时 Lambda 会删除该消息。

实施部分批处理响应

预设情况下,如果您的 Lambda 函数在处理某个批处理时遇到错误,则该批处理中的所有消息都会在队列中重新可见,包括 Lambda 已经成功处理的消息。因此,您的函数最终可能会多次处理同一消息。

对于处理失败的批处理,要避免重新处理其中已经成功处理的消息,您可以将事件源映射配置为仅使失败的消息重新可见。这称为部分批处理响应。要开启部分批处理响应,请在配置事件源映射时为 FunctionResponseTypes 操作指定 ReportBatchItemFailures。这可以让您的函数返回部分成功,从而有助于减少对记录进行不必要的重试次数。

激活 ReportBatchItemFailures 后,当函数调用失败时,Lambda 不会 缩减消息轮询范围。如果您预计某些消息会失败,并且不希望这些失败影响消息处理速率,请使用 ReportBatchItemFailures

注意

在使用部分批处理响应时,请记住以下几点:

  • 如果您函数发现了一个异常,则整个批处理将被视为完全失败。

  • 如果您将此功能与 FIFO 队列结合使用,则您的函数应在第一次失败后停止处理消息,并返回 batchItemFailures 中的所有失败和未处理的消息。这有助于保持队列中消息的顺序。

激活部分批处理报告
  1. 查看 实施部分批处理响应的最佳实践

  2. 运行以下命令来为您的函数激活 ReportBatchItemFailures。要检索事件源映射的 UUID,请运行 list-event-source-mappings AWS CLI 命令。

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. 更新您的函数代码以捕获所有异常并在 batchItemFailures JSON 响应中返回处理失败的消息。batchItemFailures 响应必须包含消息 ID 列表,以作为 itemIdentifier JSON 值。

    例如,假设一个批处理有五条消息,消息 ID 分别为 id1id2id3id4id5。您的函数成功处理了 id1id3id5。要使消息 id2id4 在队列中重新可见,您的函数应运行以下响应:

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    以下函数代码示例将返回批处理中处理失败消息的 ID 列表:

    .NET
    AWS SDK for .NET
    注意

    查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 .NET 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    适用于 Go V2 的 SDK
    注意

    查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 Go 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    SDK for Java 2.x
    注意

    查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 Java 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    JavaScript
    适用于 JavaScript 的 SDK(v3)
    注意

    查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 JavaScript 进行 Lambda SQS 批处理项目失败。

    // Node.js 20.x Lambda runtime, AWS SDK for Javascript V3 export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

    报告使用 TypeScript 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda'; export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => { const batchItemFailures: SQSBatchItemFailure[] = []; for (const record of event.Records) { try { await processMessageAsync(record); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return {batchItemFailures: batchItemFailures}; }; async function processMessageAsync(record: SQSRecord): Promise<void> { if (record.body && record.body.includes("error")) { throw new Error('There is an error in the SQS Message.'); } console.log(`Processed message ${record.body}`); }
    PHP
    适用于 PHP 的 SDK
    注意

    查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 PHP 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Sqs\SqsHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends SqsHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleSqs(SqsEvent $event, Context $context): void { $this->logger->info("Processing SQS records"); $records = $event->getRecords(); foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $this->markAsFailed($record); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); } } $logger = new StderrLogger(); return new Handler($logger);
    Python
    SDK for Python (Boto3)
    注意

    查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 Python 进行 Lambda SQS 批处理项目失败。

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    适用于 Ruby 的 SDK
    注意

    查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 Ruby 进行 Lambda SQS 批处理项目失败。

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    适用于 Rust 的 SDK
    注意

    查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

    报告使用 Rust 进行 Lambda SQS 批处理项目失败。

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

如果处理失败的事件没有返回到队列,请参阅 AWS 知识中心中的 如何排查 Lambda 函数 SQS ReportBatchItemFailures 问题?

成功和失败的条件

如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全成功:

  • 空的 batchItemFailures 列表

  • Null batchItemFailures 列表

  • 空的 EventResponse

  • Null EventResponse

如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全失败:

  • JSON 响应无效

  • 空字符串 itemIdentifier

  • Null itemIdentifier

  • 包含错误密钥名的 itemIdentifier

  • 具有某个消息 ID 的 itemIdentifier 值不存在

CloudWatch 指标

要确定函数是否在正确报告批处理项目失败情况,您可以监控 Amazon CloudWatch 中的 NumberOfMessagesDeletedApproximateAgeOfOldestMessage Amazon SQS 指标。

  • NumberOfMessagesDeleted 会跟踪从队列中删除的消息数量。如果该值降至 0,则表明您的函数响应没有正确返回失败的消息。

  • ApproximateAgeOfOldestMessage 会跟踪最早的消息在队列中停留的时间。如果此指标急剧增加,则可能表明您的函数没有正确返回失败的消息。