在 Lambda 中处理 SQS 事件源错误
为了处理与 SQS 事件源相关的错误,Lambda 会自动使用带有回退策略的重试策略。您还可以通过配置 SQS 事件源映射来返回部分批次响应,从而自定义错误处理行为。
失败调用的回退策略
如果调用失败,Lambda 会在实施回退策略时尝试重试调用。回退策略略有不同,具体取决于 Lambda 的故障原因是函数代码中的错误还是节流所致。
-
如果您的函数代码导致了该错误,Lambda 将停止处理并重试调用。同时,Lambda 会逐渐退出,以减少分配给 Amazon SQS 事件源映射的并发量。队列的可见性超时结束后,消息将再次显示在队列中。
-
如果调用失败是节流造成的,Lambda 会通过减少分配给 Amazon SQS 事件源映射的并发量来逐渐停止重试。Lambda 会继续重试该消息,直到消息的时间戳超过队列的可见性超时,此时 Lambda 会删除该消息。
实施部分批处理响应
预设情况下,如果您的 Lambda 函数在处理某个批处理时遇到错误,则该批处理中的所有消息都会在队列中重新可见,包括 Lambda 已经成功处理的消息。因此,您的函数最终可能会多次处理同一消息。
对于处理失败的批处理,要避免重新处理其中已经成功处理的消息,您可以将事件源映射配置为仅使失败的消息重新可见。这称为部分批处理响应。要开启部分批处理响应,请在配置事件源映射时为 FunctionResponseTypes 操作指定 ReportBatchItemFailures
。这可以让您的函数返回部分成功,从而有助于减少对记录进行不必要的重试次数。
激活 ReportBatchItemFailures
后,当函数调用失败时,Lambda 不会 缩减消息轮询范围。如果您预计某些消息会失败,并且不希望这些失败影响消息处理速率,请使用 ReportBatchItemFailures
。
激活部分批处理报告
-
查看 实施部分批处理响应的最佳实践。
-
运行以下命令来为您的函数激活 ReportBatchItemFailures
。要检索事件源映射的 UUID,请运行 list-event-source-mappings AWS CLI 命令。
aws lambda update-event-source-mapping \
--uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE"
\
--function-response-types "ReportBatchItemFailures"
-
更新您的函数代码以捕获所有异常并在 batchItemFailures
JSON 响应中返回处理失败的消息。batchItemFailures
响应必须包含消息 ID 列表,以作为 itemIdentifier
JSON 值。
例如,假设一个批处理有五条消息,消息 ID 分别为 id1
、id2
、id3
、id4
和 id5
。您的函数成功处理了 id1
、id3
和 id5
。要使消息 id2
和 id4
在队列中重新可见,您的函数应运行以下响应:
{
"batchItemFailures": [
{
"itemIdentifier": "id2"
},
{
"itemIdentifier": "id4"
}
]
}
以下函数代码示例将返回批处理中处理失败消息的 ID 列表:
- .NET
-
- AWS SDK for .NET
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 .NET 进行 Lambda SQS 批处理项目失败。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using Amazon.Lambda.Core;
using Amazon.Lambda.SQSEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace sqsSample;
public class Function
{
public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context)
{
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>();
foreach(var message in evnt.Records)
{
try
{
//process your message
await ProcessMessageAsync(message, context);
}
catch (System.Exception)
{
//Add failed message identifier to the batchItemFailures list
batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId});
}
}
return new SQSBatchResponse(batchItemFailures);
}
private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)
{
if (String.IsNullOrEmpty(message.Body))
{
throw new Exception("No Body in SQS Message.");
}
context.Logger.LogInformation($"Processed message {message.Body}");
// TODO: Do interesting work based on the new message
await Task.CompletedTask;
}
}
- Go
-
- 适用于 Go V2 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 Go 进行 Lambda SQS 批处理项目失败。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
)
func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) {
batchItemFailures := []map[string]interface{}{}
for _, message := range sqsEvent.Records {
if /* Your message processing condition here */ {
batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId})
}
}
sqsBatchResponse := map[string]interface{}{
"batchItemFailures": batchItemFailures,
}
return sqsBatchResponse, nil
}
func main() {
lambda.Start(handler)
}
- Java
-
- SDK for Java 2.x
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 Java 进行 Lambda SQS 批处理项目失败。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
import java.util.ArrayList;
import java.util.List;
public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
String messageId = "";
for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
try {
//process your message
messageId = message.getMessageId();
} catch (Exception e) {
//Add failed message identifier to the batchItemFailures list
batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId));
}
}
return new SQSBatchResponse(batchItemFailures);
}
}
- JavaScript
-
- 适用于 JavaScript 的 SDK(v3)
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 JavaScript 进行 Lambda SQS 批处理项目失败。
// Node.js 20.x Lambda runtime, AWS SDK for Javascript V3
export const handler = async (event, context) => {
const batchItemFailures = [];
for (const record of event.Records) {
try {
await processMessageAsync(record, context);
} catch (error) {
batchItemFailures.push({ itemIdentifier: record.messageId });
}
}
return { batchItemFailures };
};
async function processMessageAsync(record, context) {
if (record.body && record.body.includes("error")) {
throw new Error("There is an error in the SQS Message.");
}
console.log(`Processed message: ${record.body}`);
}
报告使用 TypeScript 进行 Lambda SQS 批处理项目失败。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda';
export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
const batchItemFailures: SQSBatchItemFailure[] = [];
for (const record of event.Records) {
try {
await processMessageAsync(record);
} catch (error) {
batchItemFailures.push({ itemIdentifier: record.messageId });
}
}
return {batchItemFailures: batchItemFailures};
};
async function processMessageAsync(record: SQSRecord): Promise<void> {
if (record.body && record.body.includes("error")) {
throw new Error('There is an error in the SQS Message.');
}
console.log(`Processed message ${record.body}`);
}
- PHP
-
- 适用于 PHP 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 PHP 进行 Lambda SQS 批处理项目失败。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
use Bref\Context\Context;
use Bref\Event\Sqs\SqsEvent;
use Bref\Event\Sqs\SqsHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends SqsHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleSqs(SqsEvent $event, Context $context): void
{
$this->logger->info("Processing SQS records");
$records = $event->getRecords();
foreach ($records as $record) {
try {
// Assuming the SQS message is in JSON format
$message = json_decode($record->getBody(), true);
$this->logger->info(json_encode($message));
// TODO: Implement your custom processing logic here
} catch (Exception $e) {
$this->logger->error($e->getMessage());
// failed processing the record
$this->markAsFailed($record);
}
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords SQS records");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 Python 进行 Lambda SQS 批处理项目失败。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def lambda_handler(event, context):
if event:
batch_item_failures = []
sqs_batch_response = {}
for record in event["Records"]:
try:
# process message
except Exception as e:
batch_item_failures.append({"itemIdentifier": record['messageId']})
sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
- Ruby
-
- 适用于 Ruby 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 Ruby 进行 Lambda SQS 批处理项目失败。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'json'
def lambda_handler(event:, context:)
if event
batch_item_failures = []
sqs_batch_response = {}
event["Records"].each do |record|
begin
# process message
rescue StandardError => e
batch_item_failures << {"itemIdentifier" => record['messageId']}
end
end
sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
end
end
- Rust
-
- 适用于 Rust 的 SDK
-
查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。
报告使用 Rust 进行 Lambda SQS 批处理项目失败。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
event::sqs::{SqsBatchResponse, SqsEvent},
sqs::{BatchItemFailure, SqsMessage},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
async fn process_record(_: &SqsMessage) -> Result<(), Error> {
Err(Error::from("Error processing message"))
}
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> {
let mut batch_item_failures = Vec::new();
for record in event.payload.records {
match process_record(&record).await {
Ok(_) => (),
Err(_) => batch_item_failures.push(BatchItemFailure {
item_identifier: record.message_id.unwrap(),
}),
}
}
Ok(SqsBatchResponse {
batch_item_failures,
})
}
#[tokio::main]
async fn main() -> Result<(), Error> {
run(service_fn(function_handler)).await
}
如果处理失败的事件没有返回到队列,请参阅 AWS 知识中心中的 如何排查 Lambda 函数 SQS ReportBatchItemFailures 问题?。
成功和失败的条件
如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全成功:
如果您的函数返回以下任意一项,则 Lambda 会将批处理视为完全失败:
CloudWatch 指标
要确定函数是否在正确报告批处理项目失败情况,您可以监控 Amazon CloudWatch 中的 NumberOfMessagesDeleted
和 ApproximateAgeOfOldestMessage
Amazon SQS 指标。