AWS Lambda 與 Amazon DynamoDB 使用 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

AWS Lambda 與 Amazon DynamoDB 使用

注意

如果您想要將資料傳送到 Lambda 函數以外的目標,或在傳送資料之前豐富資料,請參閱 Amazon EventBridge 管道

您可以使用 AWS Lambda 函數來處理 Amazon DynamoDB 串流中的記錄。您可以透過 DynamoDB Streams,以在每次更新 DynamoDB 資料表時,觸發 Lambda 函數來執行額外的工作。

Lambda 會從串流讀取記錄,並透過包含串流記錄的事件同步調用函數。Lambda 會讀取批次中的記錄並調用函數,以處理來自該批次的記錄。

範例事件

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" } ]}

輪詢和批次處理串流

Lambda 會輪詢您 DynamoDB 串流中的碎片,其記錄的基本速率為每秒 4 次。當記錄可用時,Lambda 會調用您的函數,並等待結果。如果處理成功,Lambda 會恢復輪詢,直到收到多筆記錄。

Lambda 預設會在記錄可用時立即調用函數。如果 Lambda 從事件來源中讀取的批次只有一筆記錄,Lambda 只會傳送一筆記錄至函數。為避免調用具有少量記錄的函數,您可設定批次間隔,請求事件來源緩衝記錄最長達五分鐘。調用函數之前,Lambda 會繼續從事件來源中讀取記錄,直到收集到完整批次、批次間隔到期或者批次達到 6 MB 的承載限制。如需詳細資訊,請參閱 批次處理行為

警告

Lambda 事件來源對應至少處理每個事件一次,並且可能會重複處理記錄。為了避免與重複事件相關的潛在問題,我們強烈建議您將函數代碼設為冪等。若要深入了解,請參閱 AWS 知識中心如何讓 Lambda 函數具有冪等性

ParallelizationFactor設定此設定,以同時處理具有多個 Lambda 叫用的 DynamoDB 串流碎片。您可以透過從 1 (預設) 到 10 的並行化因子指定 Lambda 從碎片輪詢的並行批次數。當您增加每個碎片的並行批次數時,Lambda 仍可確保在項目 (分區和排序索引鍵) 層級進行順序處理。

輪詢和串流開始位置

請注意,建立和更新事件來源映射期間的串流輪詢最終會一致。

  • 在建立事件來源映射期間,從串流開始輪詢事件可能需要幾分鐘時間。

  • 在更新事件來源映射期間,從串流停止並重新開始輪詢事件可能需要幾分鐘時間。

這種行為表示如果您指定 LATEST 當作串流的開始位置,事件來源映射可能會在建立或更新期間遺漏事件。若要確保沒有遺漏任何事件,請將串流開始位置指定為 TRIM_HORIZON

DynamoDB Streams 中的碎片同時讀取

對於不是全域資料表的單一區域資料表,您最多可以設計兩個 Lambda 函數,以便同時讀取同一個 DynamoDB Streams 碎片。超過此限制會導致請求調節。對於全域資料表,建議您將同時函數的數量限制為一個,以避免請求限流。

執行角色許可

AWSLambdaDynamoDBExecutionRole AWS 管政策包括 Lambda 需要從您的 DynamoDB 串流讀取的許可。將此受管原則新增至函數的執行角色。

若要將失敗批次的記錄傳送到標準 SQS 佇列或標準 SNS 主題,您的函數需要額外許可。每個目的地服務都需要不同的許可,如下所示:

新增權限並建立事件來源對應

建立事件來源映射,指示 Lambda 從您的串流傳送記錄至 Lambda 函數。您可以建立多個事件來源映射,來使用多個 Lambda 函數處理相同資料,或使用單一函數處理來自多個串流的項目。

若要將您的函數設定為從 DynamoDB Streams 讀取,請將AWSLambdaDynamoDBExecutionRole AWS 受管理的原則附加至您的執行角色,然後建立 DynamoDB 觸發器。

若要新增權限並建立觸發器
  1. 開啟 Lambda 主控台中的 函數頁面

  2. 選擇函數的名稱。

  3. 依序選擇 Configuration (組態) 索引標籤和 Permissions (許可)。

  4. 在 [角色名稱] 下,選擇指向您的執行角色的連結。此連結會在 IAM 主控台中開啟角色。

    連結至執行角色
  5. 選擇新增許可,然後選擇連接政策

    在 IAM 主控台中附加政策
  6. 在搜尋欄位中輸入 AWSLambdaDynamoDBExecutionRole。將此原則新增至您的執行角色。這是一項 AWS 受管政策,其中包含您的函數需要從 DynamoDB 串流讀取的權限。如需有關此原則的詳細資訊,請參閱AWSLambdaDynamoDBExecutionRoleAWS 管理的原則參考中的。

  7. 返回 Lambda 主控台中的函數。在 函式概觀 下,選擇 新增觸發條件

    Lambda 主控台的函數概觀區段
  8. 選擇觸發條件類型。

  9. 設定需要的選項,然後選擇 新增

Lambda 針對事件來源支援下列選項:

事件來源選項
  • DynamoDB 資料表 - 從中讀取記錄的 DynamoDB 資料表。

  • 批次大小 – 每個批次中要傳送至函數的記錄數量,最高為 10,000。Lambda 會將批次中所有記錄以單一呼叫傳送至函數,前提是事件的總大小不超過同步調用的酬載限制 (6 MB)。

  • 批次間隔 - 指定調用函數前收集記錄的最長時間 (秒)。

  • 開始位置 - 只處理新記錄,或所有現有的記錄。

    • 最新 - 處理已新增到串流的記錄。

    • 水平修剪 - 處理所有在串流中的記錄。

    處理任何現有的記錄後,該函式已跟上進度並持續處理新的記錄。

  • 故障目的地 - 用於無法處理之記錄的標準 SQS 佇列或標準 SNS 主題。當 Lambda 捨棄太舊或已耗盡所有重試的一批記錄時,Lambda 會將該批次的詳細資料傳送至此佇列或主題。

  • 重試嘗試 - 當函數傳回錯誤時,Lambda 重試的次數上限。這不適用於服務錯誤或調節,其中批次並沒有到達函數。

  • 記錄最大存留期 - Lambda 傳送至函數之記錄的最大存留期。

  • 在錯誤時分割批次 - 當函數傳回錯誤時,先將批次分割為兩個,再進行重試。您原始的批次大小設定仍會維持不變。

  • 每個碎片的並行批次 - 同時處理來自同一個碎片的多個批次。

  • 已啟用 - 設定為 true 可啟用事件來源映射。設定為 false 以停止處理記錄。Lambda 會追蹤上次處理的進度,並在重新啟用映射時從該時間點恢復處理。

注意

作為 DynamoDB 觸發程序的一部分,Lambda 呼叫的 GetRecords API 呼叫不需支付費用。

若要稍後管理事件來源的組態,請選擇設計工具中的觸發。

錯誤處理

DynamoDB 事件來源對映的錯誤處理取決於在呼叫函數之前還是在函數叫用期間發生錯誤:

  • 叫用前:如果 Lambda 事件來源對應由於節流或其他問題而無法叫用函數,則會重試直到記錄到期或超過事件來源映射 () 上設定的保留天數上限。MaximumRecordAgeInSeconds

  • 叫用期間:如果叫用函數但傳回錯誤,Lambda 會重試直到記錄過期、超過最大 age (MaximumRecordAgeInSeconds) 或達到設定的重試配額 (MaximumRetryAttempts)。對於功能錯誤,您也可以配置 BisectBatchOnFunctionError,將失敗的批次分割為兩個較小的批次,隔離不良記錄並避免逾時。拆分批次不會消耗重試配額。

如果錯誤處理措施失敗,Lambda 會捨棄相應記錄,並繼續處理串流中的批次。使用預設設定時,這表示不良的記錄可能會封鎖受影響碎片上的處理長達一天。若要避免此情況,在設定函數的事件來源映射時,請使用合理的重試次數和符合您使用案例的記錄最大保留期。

設定失敗呼叫的目的地

若要保留失敗的事件來源映射調用記錄,請將目標地新增到函數的事件來源映射中。每個傳送至目的地的記錄都是 JSON 文件,其中包含有關失敗叫用的中繼資料。您可以將任何 Amazon SNS 主題或 Amazon SQS 佇列設定為目的地。您的執行角色必須具有目標的權限:

  • 對於 SQS 目的地:sq s:SendMessage

  • 對於 SNS 目的地:SNS: 發佈

若要使用主控台設定失敗時的目的地,請依照下列步驟執行:

  1. 開啟 Lambda 主控台中的 函數頁面

  2. 選擇一個函數。

  3. 函數概觀 下,選擇 新增目的地

  4. 針對來源,請選擇事件來源映射調用

  5. 對於事件來源映射,請選擇針對此函數設定的事件來源。

  6. 對於條件,選取失敗時。對於事件來源映射調用,這是唯一可接受的條件。

  7. 對於目標類型,請選擇 Lambda 將調用記錄傳送至的目標類型。

  8. 對於 目的地,請選擇一個資源。

  9. 選擇 儲存

您也可以使用 AWS Command Line Interface (AWS CLI) 來設定故障時的目的地。例如,下列create-event-source-mapping命令會將具有 SQS 失敗時目的地的事件來源對應新增至:MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

下列update-event-source-mapping命令會更新事件來源對應,以便在兩次重試嘗試後或記錄超過一個小時時,將失敗的叫用記錄傳送至 SNS 目的地。

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

系統會以非同步的方式套用更新的設定,在處理完成之前不會反映在輸出中。使用get-event-source-mapping指令檢視目前狀態。

若要移除目的地,請提供空白字串作為 destination-config 參數的引數:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

下列範例顯示 DynamoDB 串流的調用記錄。

範例 調用記錄
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

您可以使用此資訊來從串流擷取受影響的記錄,以進行疑難排解。實際的記錄不包含在內,因此您必須處理此記錄,並在因過期而遺失之前從資料串流中擷取它們。

Amazon CloudWatch 指標

當您的函數處理完一個批次的記錄時,Lambda 會發出 IteratorAge 指標。指標指出處理完成時批次中最後一個記錄的存在時間。如果您的函數正在處理新的事件,可使用迭代器存留期來預估記錄新增與函數實際處理之間的延遲。

從迭代器存留期的增加趨勢可看出您函式的問題。如需詳細資訊,請參閱 使用 Lambda 函數指標

時段

Lambda 函數可執行持續串流處理應用程式。串流表示持續在應用程式中流動的無限制資料。若要分析此持續更新輸入中的資訊,您可以使用定義的時段來限制包含的記錄。

輪轉時段是定期開啟和關閉的不同時段。依預設,Lambda 調用是無狀態的,您無法在沒有外部資料庫的情況下,將其用於處理多個持續調用的資料。然而,使用輪轉時段,您可以在不同的調用間維護狀態。此狀態包含之前為目前時段處理之訊息的彙總結果。狀態可以是每個分區最多 1 MB。如果超過該大小,則 Lambda 會提前終止時段。

串流中的每個記錄都屬於一個特定時段。Lambda 至少會處理一次每筆記錄,但不保證每筆記錄只會處理一次。在極少數情況下,例如錯誤處理,某些記錄可能會處理多次。第一次時一律會依序處理記錄。如果多次處理記錄,則可能不會按順序處理。

彙總與處理

調用您的使用者管理函數進行彙總,以及處理該彙總的最終結果。Lambda 會彙總時段中接收的所有記錄。您可以在多個批次中接收這些記錄,各自作為單獨的調用。每次調用會收到一個狀態。因此,當使用輪轉時段時,您的 Lambda 函數回應必須包含 state 屬性。如果回應不包含 state 屬性,Lambda 會將此視為失敗的調用。為了滿足此條件,您的函數可以返回一個 TimeWindowEventResponse 物件,它具有下列 JSON 形狀:

範例 TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
注意

對於 Java 函數,我們建議使用 Map<String, String> 來表示狀態。

在時段結束時,標記 isFinalInvokeForWindow 會設定為 true 以指示這是最終狀態,並且可隨時進行處理。處理完成後,時段結束並完成最終調用,然後丟棄該狀態。

在時段結束時,Lambda 會針對彙總結果上的動作使用最終處理。您的最終處理將同步調用。成功調用後,您的函數檢查點序號和串流處理將會繼續。如果調用失敗,則您的 Lambda 函數會暫停進一步處理,直至成功調用。

範例 DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

組態

您可以在建立或更新事件來源對映時設定輪轉時段。若要設定暫停視窗,請以秒為單位指定視窗 (TumblingWindowInSeconds)。下列範例 AWS Command Line Interface (AWS CLI) 命令會建立具有 120 秒暫停視窗的串流事件來源對應。針對彙總與處理定義的 Lambda 函數命名為 tumbling-window-example-function

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds 120

Lambda 根據記錄插入串流的時間,確定輪轉時段邊界。所有記錄都有 Lambda 在邊界確定中使用的近似時間戳記。

輪轉時段彙總不支援重新分區。分區結束後,Lambda 會考慮關閉時段,並且子分區會以新的狀態開始自己的時段。

輪轉時段完全支援現有的重試政策 maxRetryAttemptsmaxRecordAge

範例 Handler.py - 彙總與處理

下列 Python 函數示範了如何彙總,然後處理您的最終狀態:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

報告批次項目失敗

取用和處理事件來源的串流資料時,依預設,只有在批次成功完成時,Lambda 檢查點才會到批次的最高序號。Lambda 會將所有其他結果視為完全失敗,並重試處理批次,直至達到重試限制。若要在處理串流的批次時允許部分成功,請開啟 ReportBatchItemFailures。允許部分成功有助於減少記錄的重試次數,但其不會完全消除在成功記錄中重試的可能性。

若要開啟ReportBatchItemFailures,請在清單ReportBatchItemFailures中包含FunctionResponseTypes列舉值。此清單指示已為您的函數啟用哪些回應類型。您可以在建立更新事件來源對應時設定此清單。

報告語法

設定批次項目失敗的報告時,會傳回 StreamsEventResponse 類別,其中包含批次項目失敗的清單。您可以使用 StreamsEventResponse 物件,來傳回批次中第一個失敗記錄的序號。您還可以使用正確的回應語法,建立自己的自訂類別。下列 JSON 結構顯示所需的回應語法:

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
注意

如果 batchItemFailures 陣列包含多個項目,則 Lambda 會使用具有最低序列號的記錄作為檢查點。然後,Lambda 會重試從該檢查點開始的所有記錄。

成功與失敗條件

如果您傳回下列任一項目,Lambda 會將批次視為完全成功:

  • 空白 batchItemFailure 清單

  • Null batchItemFailure 清單

  • 空白 EventResponse

  • Null EventResponse

如果您傳回下列任一項目,Lambda 會將批次視為完全失敗:

  • 空白字串 itemIdentifier

  • Null itemIdentifier

  • 具有錯誤金鑰名稱的 itemIdentifier

Lambda 會根據您的重試政策來重試失敗。

將批次平分

如果您的調用失敗且 BisectBatchOnFunctionError 已開啟,則無論您的 ReportBatchItemFailures 設定如何,批次都會被平分。

收到部分批次成功回應且 BisectBatchOnFunctionErrorReportBatchItemFailures 均開啟時,批次會依傳回的序號進行平分,並且 Lambda 僅會重試剩餘的記錄。

以下範例函數程式碼會傳回批次中失敗訊息 ID 的清單:

.NET
AWS SDK for .NET
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 .NET 報告使用 Lambda 批次項目失敗。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>(); StreamsEventResponse streamsEventResponse = new StreamsEventResponse(); foreach (var record in dynamoEvent.Records) { try { var sequenceNumber = record.Dynamodb.SequenceNumber; context.Logger.LogInformation(sequenceNumber); } catch (Exception ex) { context.Logger.LogError(ex.Message); batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber }); } } if (batchItemFailures.Count > 0) { streamsEventResponse.BatchItemFailures = batchItemFailures; } context.Logger.LogInformation("Stream processing complete."); return streamsEventResponse; } }
Go
SDK for Go V2
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Go 使用 Lambda 報告批次項目失敗。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type BatchItemFailure struct { ItemIdentifier string `json:"ItemIdentifier"` } type BatchResult struct { BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"` } func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) { var batchItemFailures []BatchItemFailure curRecordSequenceNumber := "" for _, record := range event.Records { // Process your record curRecordSequenceNumber = record.Change.SequenceNumber } if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber}) } batchResult := BatchResult{ BatchItemFailures: batchItemFailures, } return &batchResult, nil } func main() { lambda.Start(HandleRequest) }
Java
適用於 Java 2.x 的 SDK
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Java 使用 Lambda 報告批次項目失敗。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
JavaScript
適用於 JavaScript (v3) 的開發套件
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Lambda 使用報告批次項目失敗 JavaScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event) => { const records = event.Records; let curRecordSequenceNumber = ""; for (const record of records) { try { // Process your record curRecordSequenceNumber = record.dynamodb.SequenceNumber; } catch (e) { // Return failed record's sequence number return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] }; } } return { batchItemFailures: [] }; };

使用 Lambda 使用報告批次項目失敗 TypeScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { DynamoDBBatchResponse, DynamoDBBatchItemFailure, DynamoDBStreamEvent, } from "aws-lambda"; export const handler = async ( event: DynamoDBStreamEvent ): Promise<DynamoDBBatchResponse> => { const batchItemFailures: DynamoDBBatchItemFailure[] = []; let curRecordSequenceNumber; for (const record of event.Records) { curRecordSequenceNumber = record.dynamodb?.SequenceNumber; if (curRecordSequenceNumber) { batchItemFailures.push({ itemIdentifier: curRecordSequenceNumber, }); } } return { batchItemFailures: batchItemFailures }; };
PHP
適用於 PHP 的開發套件
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 PHP 使用 Lambda 報告批次項目失敗。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $dynamoDbEvent = new DynamoDbEvent($event); $this->logger->info("Processing records"); $records = $dynamoDbEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
Python
適用於 Python (Boto3) 的 SDK
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Python 使用 Lambda 報告批次項目失敗。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Ruby
適用於 Ruby 的開發套件
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用紅寶石使用 Lambda 報告批次項目失敗。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event:, context:) records = event["Records"] cur_record_sequence_number = "" records.each do |record| begin # Process your record cur_record_sequence_number = record["dynamodb"]["SequenceNumber"] rescue StandardError => e # Return failed record's sequence number return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]} end end {"batchItemFailures" => []} end
Rust
適用於 Rust 的 SDK
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Rust 使用 Lambda 報告批次項目失敗。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::dynamodb::{Event, EventRecord, StreamRecord}, streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Process the stream record fn process_record(record: &EventRecord) -> Result<(), Error> { let stream_record: &StreamRecord = &record.change; // process your stream record here... tracing::info!("Data: {:?}", stream_record); Ok(()) } /// Main Lambda handler here... async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> { let mut response = DynamoDbEventResponse { batch_item_failures: vec![], }; let records = &event.payload.records; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in records { tracing::info!("EventId: {}", record.event_id); // Couldn't find a sequence number if record.change.sequence_number.is_none() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: Some("".to_string()), }); return Ok(response); } // Process your record here... if process_record(record).is_err() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: record.change.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!("Successfully processed {} record(s)", records.len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Amazon DynamoDB Streams 組態參數

所有 Lambda 事件來源類型都共用相同CreateEventSourceMappingUpdateEventSourceMappingAPI 作業。但是,只有一些參數適用於 DynamoDB Streams。

適用於 DynamoDB Streams 的事件來源參數
參數 必要 預設 備註

BatchSize

100

上限:10,000

BisectBatchOnFunctionError

N

false

DestinationConfig

N

捨棄記錄的標準 Amazon SQS 佇列或標準 Amazon SNS 主題目的地

已啟用

N

true

EventSourceArn

Y

資料串流或串流消費者的 ARN

FilterCriteria

N

Lambda 事件篩選

FunctionName

Y

FunctionResponseTypes

N

若要讓函數報告批次中的特定失敗,請將值 ReportBatchItemFailures 包含在 FunctionResponseTypes 中。如需詳細資訊,請參閱 報告批次項目失敗

MaximumBatchingWindowInSeconds

N

0

MaximumRecordAgeInSeconds

N

-1

-1 表示 infinite:失敗的記錄會重試,直到記錄過期為止。DynamoDB Streams 的資料保留限制為 24 小時。

下限:-1

上限:604,800

MaximumRetryAttempts

N

-1

-1 代表無限:系統會重試失敗的記錄,直到記錄過期為止

下限:0

上限:10,000

ParallelizationFactor

N

1

上限:10

StartingPosition

Y

TRIM_HORIZON 或 LATEST

TumblingWindowInSeconds

N

下限:0

上限:900