搭配 Amazon SQS 使用 Lambda - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

搭配 Amazon SQS 使用 Lambda

注意

如果您想要將資料傳送到 Lambda 函數以外的目標,或在傳送資料之前豐富資料,請參閱 Amazon EventBridge 管道

您可以使用 Lambda 函數處理 Amazon Simple Queue Service (Amazon SQS) 佇列中的訊息。Lambda 事件來源映射支援標準佇列先進先出 (FIFO) 佇列。使用 Amazon SQS 時,您可以藉由將任務傳送到佇列並進行非同步處理,以從應用程式中的一個元件中卸載任務。

Lambda 輪詢佇列並使用包含佇列訊息的事件同步調用 Lambda 函數。Lambda 會以批次讀取訊息,並為每個批次調用一次函數。當您的函數成功處理批次時,Lambda 會從佇列中刪除其訊息。

當 Lambda 讀取批次時,訊息會留在佇列中,但是在佇列的可見性逾時期間會變成隱藏。如果您的函數成功處理批次,Lambda 會從佇列中刪除訊息。根據預設,如果您的函數在處理批次時遇到錯誤,則該批次中的所有訊息會在可見性逾時到期後再次顯示在佇列中。因此,您的函數程式碼必須能夠多次處理相同的訊息,而不會產生副作用。

警告

Lambda 事件來源對應至少處理每個事件一次,而且可能會重複處理批次。為了避免與重複事件相關的潛在問題,我們強烈建議您將函數代碼設為冪等。若要深入了解,請參閱 AWS 知識中心如何讓 Lambda 函數具有冪等性

若要防止 Lambda 多次處理訊息,您可以設定事件來源對應以在函數回應中包含批次項目失敗,或者在 Lambda 函數成功處理訊息時,使用 Amazon SQS API 動作DeleteMessage將訊息從佇列中移除。如需使用 Amazon SQS API 的詳細資訊,請參閱 Amazon 簡單佇列服務 API 參考

標準佇列訊息事件範例

範例 Amazon SQS 訊息事件 (標準佇列)
{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

依預設,Lambda 會一次輪詢佇列中最多 10 則訊息,並將該批次傳送給函數。為避免調用具有少量記錄的函數,您可設定批次間隔,請求事件來源緩衝記錄最長達五分鐘。調用函數之前,Lambda 會繼續從 SQS 標準佇列輪詢訊息,直至批次間隔到期、達到調用酬載大小配額,或達到設定的批次大小上限為止。

如果您使用的是批次時間範圍,並且您的 SQS 佇列包含非常低的流量,Lambda 可能會等到 20 秒,然後再調用您的函數。即使您將批次時間範圍設定為低於 20 秒也是如此。

注意

在 Java 中,還原序列化 JSON 時可能會遇到 null 指標錯誤。這可能是由於 JSON 物件映射器對「Records」和「eventSourceARN」案例轉換的方式所致。

FIFO 佇列訊息事件範例

對於 FIFO 佇列,記錄包含與重複資料刪除和定序相關的其他屬性。

範例 Amazon SQS 訊息事件 (FIFO 佇列)
{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

配置要搭配 Lambda 使用的佇列

建立 SQS 佇列來作為您的 Lambda 函數的事件來源。然後設定佇列以允許您的 Lambda 函數能夠有處理每個事件批次的時間,也讓 Lambda 在擴大規模時可重新行嘗試調節錯誤。

為讓您的函數有時間處理每個記錄批次,請將來源佇列的可見性逾時設定為您為函數設定之逾時的至少六倍。萬一您的函數在處理前一個批次時遭到調節,額外的時間可允許 Lambda 重試。

如果函數多次處理訊息失敗,Amazon SQS 可將訊息傳送到無效字母佇列。如果函數傳回錯誤,批次中的所有項目將返回佇列中。發生可見性逾時之後,Lambda 會再次收到訊息。若要在多次接收後將訊息傳送至第二個佇列,請在來源佇列上設定無效字母佇列。

注意

請確定在來源佇列上設定無效字母佇列,而不是在 Lambda 函數上。您在函數上設定的無效字母佇列用於佇列的非同步調用函數,而不是事件來源佇列。

如果您的函數因為以達到並行上限而傳回錯誤或無法調用,可能可以透過進行額外的嘗試來使處理成功。若要讓訊息在傳送到無效字母佇列前更有機會受到處理,請將來源佇列再驅動政策上的 maxReceiveCount 設為至少 5

執行角色許可

AWSLambdaSQSQueueExecutionRole AWS 管政策包括 Lambda 需要從您的 Amazon SQS 佇列讀取的許可。將此受管原則新增至函數的執行角色。

或者,如果您使用的是加密佇列,也需要將下列許可新增至您的執行角色:

新增權限並建立事件來源對應

建立事件來源映射,指示 Lambda 從您的佇列傳送項目至 Lambda 函數。您可以建立多個事件來源映射,使用單一函數處理來自多個佇列的項目。當 Lambda 調用目標函數時,事件可能包含多個項目,最多為可設定的最大批次大小

若要將函數設定為從 Amazon SQS 讀取,請將AWSLambdaSQSQueueExecutionRole AWS 受管政策附加到執行角色,然後建立 SQS 觸發器。

若要新增權限並建立觸發器
  1. 開啟 Lambda 主控台中的 函數頁面

  2. 選擇函數的名稱。

  3. 依序選擇 Configuration (組態) 索引標籤和 Permissions (許可)。

  4. 在 [角色名稱] 下,選擇指向您的執行角色的連結。此連結會在 IAM 主控台中開啟角色。

    
              連結至執行角色
  5. 選擇新增許可,然後選擇連接政策

    
              在 IAM 主控台中附加政策
  6. 在搜尋欄位中輸入 AWSLambdaSQSQueueExecutionRole。將此原則新增至您的執行角色。這是一項 AWS 受管政策,其中包含函數需要從 Amazon SQS 佇列讀取的許可。如需有關此原則的詳細資訊,請參閱AWSLambdaSQSQueueExecutionRoleAWS 管理的原則參考中的。

  7. 返回 Lambda 主控台中的函數。在 函式概觀 下,選擇 新增觸發條件

    
              Lambda 主控台的函數概觀區段
  8. 選擇觸發條件類型。

  9. 設定需要的選項,然後選擇 新增

Lambda 支援 Amazon SQS 事件來源的下列選項:

SQS 佇列

要從中讀取記錄的 Amazon SQS 佇列。

啟用觸發條件

事件來源映射的狀態。系統會預設選取 啟用觸發條件

批次大小

每個批次中要傳送至函數的記錄數量上限。若是標準佇列,這最高可達 10,000 個記錄。若是 FIFO 佇列,最大值為 10。批次大小若超過 10,您還必須將批次間隔 (MaximumBatchingWindowInSeconds) 設定為至少 1 秒。

設定函數逾時以允許足夠時間來處理整個項目批次。如果項目需要長時間處理,請選擇較小的批次大小。大型批次大小可提升效率,適用非常快速或開銷龐大的工作負載。如果您在函數上設定保留並行,應最少設定五個並行執行,藉此降低 Lambda 調用函數時的調節錯誤機率。

Lambda 會將批次中所有記錄以單一呼叫傳送至函數,前提是事件的總大小不超過同步調用的調用酬載大小配額 (6 MB)。Lambda 和 Amazon SQS 兩者均會產生每筆記錄的中繼資料。這個額外的中繼資料會計入總酬載大小,並且可能會導致批次中傳送的記錄總數低於您設定的批次大小。Amazon SQS 傳送的中繼資料欄位長度可能有所不同。如需 Amazon SQS 中繼資料欄位的詳細資訊,請參閱 Amazon 簡單佇列服務 ReceiveMessageAPI 參考中的 API 操作文件。

批次視窗

調用函式前收集記錄的最長時間 (單位為秒)。這僅適用於標準佇列。

如果您使用的批次間隔大於 0 秒,則必須考慮佇列可見性逾時中增加的處理時間。我們建議將您的佇列可見性逾時設定為函數逾時的六倍,再加上 MaximumBatchingWindowInSeconds 的值。這可讓您的 Lambda 函數有時間處理每個事件批次,並在發生節流錯誤時重試。

當訊息可供使用,Lambda 會開始批次處理訊息。Lambda 會開始一次處理五個批次,而函數有五個並行調用。如果訊息仍可用,則 Lambda 每分鐘會再為函數增加最多 300 個執行個體,上限是 1,000 個函數執行個體。若要深入了解函數擴展與並行,請參閱 Lambda 函數擴展

若要處理更多訊息,您可以最佳化 Lambda 函數以達到更高的輸送量。請參閱了解如何使用 Amazon SQS 標準佇列 AWS Lambda 擴展規模

最大並行數量

事件來源可以調用的並行函數上限。如需詳細資訊,請參閱 設定 Amazon SQS 事件來源的並行上限

篩選條件

新增篩選條件來控制 Lambda 要傳送哪些事件給函數進行處理。如需詳細資訊,請參閱 Lambda 事件篩選

擴展和處理

針對標準佇列,Lambda 會使用長輪詢來輪詢佇列,直至其處於作用中狀態。當訊息可用時,Lambda 會開始一次處理五個批次,而函數有五個並行調用。如果訊息仍然可用,則 Lambda 會將讀取批次的處理數量增加為每分鐘最多 300 個執行個體。事件來源映射可同時處理的批次數量上限為 1,000。

針對 FIFO 佇列,Lambda 會按照其接收的順序傳送訊息到您的函數。當您傳送訊息到 FIFO 佇列時,您可以指定訊息群組 ID。Amazon SQS 可確保相同群組中的訊息依序傳遞至 Lambda。Lambda 會將訊息分成群組,並且一次只傳送一個群組的一個批次。如果您的函數傳回錯誤,函數會先在受影響的訊息上嘗試所有重試,之後 Lambda 才會從相同群組接收到額外訊息。

您的函數可以並行擴展至作用中訊息群組的數量。如需詳細資訊,請參閱 AWS 計算部落格上的 SQS FIFO 做為事件來源

設定 Amazon SQS 事件來源的並行上限

並行上限設定限制了 Amazon SQS 事件來源可以調用的函數並行執行個體數。並行上限是事件來源層級的設定。如果您將多個 Amazon SQS 事件來源映射到一個函數,那麼每個事件來源都可以有個別的並行上限設定。您可以使用並行上限來防止一個佇列用完函數的所有預留並行配額,或其餘的帳戶並行配額。對 Amazon SQS 事件來源設定並行無需付費。

重要的是,並行上限和預留並行是兩項獨立的設定。請勿將並行上限設為超過函數的預留並行。若您設定了並行上限,請確定函數的預留並行大於或等於函數上所有 Amazon SQS 事件來源的總並行上限。若小於此上限,Lambda 可能會限流您的訊息。

如果沒有設定並行上限,Lambda 可能會將您的 Amazon SQS 事件來源擴展至帳戶的總並行配額 (預設為 1,000)。

注意

對於 FIFO 佇列,並行調用的上限是訊息群組 ID (messageGroupId) 的數量或最大並行設定 (以較低者為準)。例如,如果您有六個訊息群組 ID,而最大並行設定為 10,則函數最多可以有六個並行調用。

您可以對新的和現有的 Amazon SQS 事件來源映射設定並行上限。

使用 Lambda 主控台設定並行上限
  1. 開啟 Lambda 主控台中的 函數頁面

  2. 選擇函數的名稱。

  3. 函數概觀 下,選擇 SQS。這會開啟 Configuration (組態) 索引標籤。

  4. 選取 Amazon SQS 觸發條件,然後選擇 編輯

  5. Maximum concurrency (並行上限) 請輸入介於 2 到 1,000 之間的數字。若要關閉並行上限,請將方塊保留空白。

  6. 選擇 儲存

使用 AWS Command Line Interface (AWS CLI)配置最大並發

使用 update-event-source-mapping 命令搭配 --scaling-config 選項。範例:

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config '{"MaximumConcurrency":5}'

若要關閉並行上限,請為 --scaling-config 輸入空值:

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config "{}"
使用 Lambda API 設定並行上限

ScalingConfig對物件使用CreateEventSourceMappingUpdateEventSourceMapping動作。

事件來源映射 API

若要使用 AWS Command Line Interface (AWS CLI)AWS SDK 來管理事件來源,可以使用下列 API 操作:

下列範例使用 AWS CLI 將名為的函數對應my-function至 Amazon SQS 佇列 (由其 Amazon 資源名稱 (ARN) 指定,批次大小為 5,批次視窗為 60 秒。

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue

您應該會看到下列輸出:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }

失敗調用的輪詢策略

當調用失敗,Lambda 會在實作輪詢策略時嘗試重試調用。根據 Lambda 是否因函數程式碼錯誤或限流而發生失敗,輪詢策略會略有不同。

  • 如果您的函數代碼導致錯誤,Lambda 將停止處理並重試調用。與此同時,Lambda 會逐漸退回,減少分配給 Amazon SQS 事件來源映射的並行數量。佇列的可見性逾時用完之後,訊息會再次出現在佇列中。

  • 如果是限流導致調用失敗,Lambda 會減少分配給 Amazon SQS 事件來源映射的並行數量,逐漸重試輪詢。Lambda 會持續重試訊息,直到訊息的時間戳記超過佇列的可見性逾時為止,此時 Lambda 會捨棄訊息。

實作部分批次回應

當 Lambda 函數在處理批次時遇到錯誤,根據預設,該批次中的所有訊息會再次顯示在佇列中,包含 Lambda 已順利處理的訊息。因此,您的函數可能最後會處理數次相同的訊息。

若要避免重新處理失敗批次中成功處理過的訊息,您可以設定事件來源映射,僅讓失敗的訊息再次可見。我們將其稱為部分批次回應。若要開啟部分批次回應,請ReportBatchItemFailures在配置事件來源對應時指定FunctionResponseTypes動作。這可以讓您的函數傳回部分成功,有助於減少記錄上不必要的重試次數。

啟動 ReportBatchItemFailures 時,Lambda 不會在函數調用失敗時 縮減訊息輪詢的規模。如果您預期部分訊息會失敗,且不希望這些失敗影響到訊息的處理速度,則請使用 ReportBatchItemFailures

注意

使用部分批次回應時,請注意下列事項:

  • 如果函數擲出例外情況,便會將整個批次視為完全失敗。

  • 如果您將此功能與 FIFO 佇列一起使用,您的函數應該在第一次失敗後停止處理訊息,並傳回 batchItemFailures 中所有失敗與尚未處理的訊息。這有助於保留佇列中訊息的順序。

若要啟動部分批次報告
  1. 檢閱 實作部分批次回應的最佳實務

  2. 執行以下命令以便為函數啟用 ReportBatchItemFailures。若要擷取事件來源對應的 UUID,請執行指list-event-source-mappings AWS CLI 令。

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. 更新函數程式碼以擷取所有例外狀況,並傳回 batchItemFailures JSON 回應中的失敗訊息。batchItemFailures 回應必須含有以 itemIdentifier JSON 值表示的訊息 ID 清單。

    例如,假設您有五則訊息的批次,其中,訊息 ID 分別為 id1id2id3id4,以及 id5。您的函數已成功處理 id1id3,以及 id5。若要讓訊息 id2id4 再次於佇列中可見,您的函數應傳回以下回應:

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    以下範例函數程式碼會傳回批次中失敗訊息 ID 的清單:

    .NET
    AWS SDK for .NET
    注意

    還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 .NET 搭配 Lambda 報告 SQS 批次項目失敗。

    using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    SDK for Go V2
    注意

    還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 Go 使用 Lambda 報告 SQS 批次項目失敗。

    package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    適用於 Java 2.x 的 SDK
    注意

    還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 Java 搭配 Lambda 報告 SQS 批次項目失敗。

    import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    JavaScript
    適用於 JavaScript (v2) 的開發套件
    注意

    還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 JavaScript Lambda 報告 SQS 批次項目失敗

    export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

    使用 TypeScript Lambda 報告 SQS 批次項目失敗

    import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda'; export const handler = async (event: APIGatewayProxyEvent, context: Context): Promise<APIGatewayProxyResult> => { const batchItemFailures: { ItemIdentifier: string }[] = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ ItemIdentifier: record.messageId }); } } return { statusCode: 200, body: JSON.stringify({ batchItemFailures }), }; }; async function processMessageAsync(record: any, context: Context): Promise<void> { if (!record.body) { throw new Error('No Body in SQS Message.'); } context.log(`Processed message ${record.body}`); }
    PHP
    適用於 PHP 的開發套件
    注意

    還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 PHP 使用 Lambda 回報 SQS 批次項目失敗。

    <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $sqsEvent = new SqsEvent($event); $this->logger->info("Processing SQS records"); $records = $sqsEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getMessageId(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); // Format failures for the response $failures = array_map( fn(string $messageId) => ['itemIdentifier' => $messageId], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger); ?>
    Python
    適用於 Python (Boto3) 的 SDK
    注意

    還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 Python 搭配 Lambda 報告 SQS 批次項目失敗。

    import json def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    適用於 Ruby 的開發套件
    注意

    還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 Ruby 搭配 Lambda 報告 SQS 批次項目失敗。

    require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    適用於 Rust 的 SDK
    注意

    還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

    使用 Rust 搭配 Lambda 報告 SQS 批次項目失敗。

    use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

如果失敗的事件沒有返回佇列,請參閱如何疑難排解 Lambda 函數 SQS ReportBatchItemFailures? 在 AWS 知識中心。

成功與失敗條件

如果您的函數傳回下列任一項目,Lambda 會將批次視為完全成功:

  • 空白 batchItemFailures 清單

  • Null batchItemFailures 清單

  • 空白 EventResponse

  • Null EventResponse

如果您的函數傳回下列任一項目,Lambda 會將批次視為完全失敗:

  • 無效的 JSON 回應

  • 空白字串 itemIdentifier

  • Null itemIdentifier

  • 具有錯誤金鑰名稱的 itemIdentifier

  • 具有不存在之訊息 ID 的 itemIdentifier

CloudWatch 度量

若要判斷您的函數是否正確報告批次項目失敗,您可以在 ApproximateAgeOfOldestMessage Amazon 中監控NumberOfMessagesDeleted和 Amazon SQS 指標。 CloudWatch

  • NumberOfMessagesDeleted 會追蹤從佇列移除的訊息數目。如果下降到 0,表示您的函數回應並未正確傳回失敗訊息。

  • ApproximateAgeOfOldestMessage 會追蹤最舊訊息停留在佇列中的時間長度。此指標的急劇增加可能表示您的函數並未正確傳回失敗訊息。

Amazon SQS 組態參數

所有 Lambda 事件來源類型都共用相同CreateEventSourceMappingUpdateEventSourceMappingAPI 作業。但是,只有一些參數適用於 Amazon SQS。

適用於 Amazon SQS 的事件來源參數
參數 必要 預設 備註

BatchSize

N

10

對於標準佇列,最大值為 10,000。對於 FIFO 隊列,最大值為 10。

已啟用

N

true

EventSourceArn

Y

資料串流或串流消費者的 ARN

FunctionName

Y

FilterCriteria

N

Lambda 事件篩選

FunctionResponseTypes

N

若要讓函數報告批次中的特定失敗,請將值 ReportBatchItemFailures 包含在 FunctionResponseTypes 中。如需詳細資訊,請參閱 實作部分批次回應

MaximumBatchingWindowInSeconds

N

0

ScalingConfig

N

設定 Amazon SQS 事件來源的並行上限