本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
AWS Lambda 與 Amazon DynamoDB 使用
注意
如果您想要將資料傳送到 Lambda 函數以外的目標,或在傳送資料之前豐富資料,請參閱 Amazon EventBridge 管道。
您可以使用 AWS Lambda 函數來處理 Amazon DynamoDB 串流中的記錄。您可以透過 DynamoDB Streams,以在每次更新 DynamoDB 資料表時,觸發 Lambda 函數來執行額外的工作。
Lambda 會從串流讀取記錄,並透過包含串流記錄的事件同步調用函數。Lambda 會讀取批次中的記錄並調用函數,以處理來自該批次的記錄。
章節
範例事件
{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" } ]}
輪詢和批次處理串流
Lambda 會輪詢您 DynamoDB 串流中的碎片,其記錄的基本速率為每秒 4 次。當記錄可用時,Lambda 會調用您的函數,並等待結果。如果處理成功,Lambda 會恢復輪詢,直到收到多筆記錄。
Lambda 預設會在記錄可用時立即調用函數。如果 Lambda 從事件來源中讀取的批次只有一筆記錄,Lambda 只會傳送一筆記錄至函數。為避免調用具有少量記錄的函數,您可設定批次間隔,請求事件來源緩衝記錄最長達五分鐘。調用函數之前,Lambda 會繼續從事件來源中讀取記錄,直到收集到完整批次、批次間隔到期或者批次達到 6 MB 的承載限制。如需詳細資訊,請參閱 批次處理行為。
警告
Lambda 事件來源對應至少處理每個事件一次,並且可能會重複處理記錄。為了避免與重複事件相關的潛在問題,我們強烈建議您將函數代碼設為冪等。若要深入了解,請參閱 AWS 知識中心如何讓 Lambda 函數具有冪等性
ParallelizationFactor設定此設定,以同時處理具有多個 Lambda 叫用的 DynamoDB 串流碎片。您可以透過從 1 (預設) 到 10 的並行化因子指定 Lambda 從碎片輪詢的並行批次數。當您增加每個碎片的並行批次數時,Lambda 仍可確保在項目 (分區和排序索引鍵) 層級進行順序處理。
輪詢和串流開始位置
請注意,建立和更新事件來源映射期間的串流輪詢最終會一致。
-
在建立事件來源映射期間,從串流開始輪詢事件可能需要幾分鐘時間。
-
在更新事件來源映射期間,從串流停止並重新開始輪詢事件可能需要幾分鐘時間。
這種行為表示如果您指定 LATEST
當作串流的開始位置,事件來源映射可能會在建立或更新期間遺漏事件。若要確保沒有遺漏任何事件,請將串流開始位置指定為 TRIM_HORIZON
。
DynamoDB Streams 中的碎片同時讀取
對於不是全域資料表的單一區域資料表,您最多可以設計兩個 Lambda 函數,以便同時讀取同一個 DynamoDB Streams 碎片。超過此限制會導致請求調節。對於全域資料表,建議您將同時函數的數量限制為一個,以避免請求限流。
執行角色許可
受AWSLambdaDynamoDBExecutionRole AWS 管政策包括 Lambda 需要從您的 DynamoDB 串流讀取的許可。將此受管原則新增至函數的執行角色。
若要將失敗批次的記錄傳送到標準 SQS 佇列或標準 SNS 主題,您的函數需要額外許可。每個目的地服務都需要不同的許可,如下所示:
Amazon SQS-平方米:SendMessage
Amazon SNS–sns:Publish
新增權限並建立事件來源對應
建立事件來源映射,指示 Lambda 從您的串流傳送記錄至 Lambda 函數。您可以建立多個事件來源映射,來使用多個 Lambda 函數處理相同資料,或使用單一函數處理來自多個串流的項目。
若要將您的函數設定為從 DynamoDB Streams 讀取,請將AWSLambdaDynamoDBExecutionRole AWS 受管理的原則附加至您的執行角色,然後建立 DynamoDB 觸發器。
若要新增權限並建立觸發器
開啟 Lambda 主控台中的 函數頁面
。 -
選擇函數的名稱。
-
依序選擇 Configuration (組態) 索引標籤和 Permissions (許可)。
-
在 [角色名稱] 下,選擇指向您的執行角色的連結。此連結會在 IAM 主控台中開啟角色。
-
選擇新增許可,然後選擇連接政策。
-
在搜尋欄位中輸入
AWSLambdaDynamoDBExecutionRole
。將此原則新增至您的執行角色。這是一項 AWS 受管政策,其中包含您的函數需要從 DynamoDB 串流讀取的權限。如需有關此原則的詳細資訊,請參閱AWSLambdaDynamoDBExecutionRole受AWS 管理的原則參考中的。 -
返回 Lambda 主控台中的函數。在 函式概觀 下,選擇 新增觸發條件 。
-
選擇觸發條件類型。
-
設定需要的選項,然後選擇 新增 。
Lambda 針對事件來源支援下列選項:
事件來源選項
-
DynamoDB 資料表 - 從中讀取記錄的 DynamoDB 資料表。
-
批次大小 – 每個批次中要傳送至函數的記錄數量,最高為 10,000。Lambda 會將批次中所有記錄以單一呼叫傳送至函數,前提是事件的總大小不超過同步調用的酬載限制 (6 MB)。
批次間隔 - 指定調用函數前收集記錄的最長時間 (秒)。
-
開始位置 - 只處理新記錄,或所有現有的記錄。
-
最新 - 處理已新增到串流的記錄。
-
水平修剪 - 處理所有在串流中的記錄。
處理任何現有的記錄後,該函式已跟上進度並持續處理新的記錄。
-
故障目的地 - 用於無法處理之記錄的標準 SQS 佇列或標準 SNS 主題。當 Lambda 捨棄太舊或已耗盡所有重試的一批記錄時,Lambda 會將該批次的詳細資料傳送至此佇列或主題。
重試嘗試 - 當函數傳回錯誤時,Lambda 重試的次數上限。這不適用於服務錯誤或調節,其中批次並沒有到達函數。
記錄最大存留期 - Lambda 傳送至函數之記錄的最大存留期。
在錯誤時分割批次 - 當函數傳回錯誤時,先將批次分割為兩個,再進行重試。您原始的批次大小設定仍會維持不變。
每個碎片的並行批次 - 同時處理來自同一個碎片的多個批次。
-
已啟用 - 設定為 true 可啟用事件來源映射。設定為 false 以停止處理記錄。Lambda 會追蹤上次處理的進度,並在重新啟用映射時從該時間點恢復處理。
注意
作為 DynamoDB 觸發程序的一部分,Lambda 呼叫的 GetRecords API 呼叫不需支付費用。
若要稍後管理事件來源的組態,請選擇設計工具中的觸發。
錯誤處理
DynamoDB 事件來源對映的錯誤處理取決於在呼叫函數之前還是在函數叫用期間發生錯誤:
-
叫用前:如果 Lambda 事件來源對應由於節流或其他問題而無法叫用函數,則會重試直到記錄到期或超過事件來源映射 () 上設定的保留天數上限。MaximumRecordAgeInSeconds
-
叫用期間:如果叫用函數但傳回錯誤,Lambda 會重試直到記錄過期、超過最大 age (MaximumRecordAgeInSeconds) 或達到設定的重試配額 (MaximumRetryAttempts)。對於功能錯誤,您也可以配置 BisectBatchOnFunctionError,將失敗的批次分割為兩個較小的批次,隔離不良記錄並避免逾時。拆分批次不會消耗重試配額。
如果錯誤處理措施失敗,Lambda 會捨棄相應記錄,並繼續處理串流中的批次。使用預設設定時,這表示不良的記錄可能會封鎖受影響碎片上的處理長達一天。若要避免此情況,在設定函數的事件來源映射時,請使用合理的重試次數和符合您使用案例的記錄最大保留期。
設定失敗呼叫的目的地
若要保留失敗的事件來源映射調用記錄,請將目標地新增到函數的事件來源映射中。每個傳送至目的地的記錄都是 JSON 文件,其中包含有關失敗叫用的中繼資料。您可以將任何 Amazon SNS 主題或 Amazon SQS 佇列設定為目的地。您的執行角色必須具有目標的權限:
若要使用主控台設定失敗時的目的地,請依照下列步驟執行:
開啟 Lambda 主控台中的 函數頁面
。 -
選擇一個函數。
-
在 函數概觀 下,選擇 新增目的地。
-
針對來源,請選擇事件來源映射調用。
-
對於事件來源映射,請選擇針對此函數設定的事件來源。
-
對於條件,選取失敗時。對於事件來源映射調用,這是唯一可接受的條件。
-
對於目標類型,請選擇 Lambda 將調用記錄傳送至的目標類型。
-
對於 目的地,請選擇一個資源。
-
選擇 儲存 。
您也可以使用 AWS Command Line Interface (AWS CLI) 來設定故障時的目的地。例如,下列create-event-source-mappingMyFunction
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
下列update-event-source-mapping
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
系統會以非同步的方式套用更新的設定,在處理完成之前不會反映在輸出中。使用get-event-source-mapping
若要移除目的地,請提供空白字串作為 destination-config
參數的引數:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
下列範例顯示 DynamoDB 串流的調用記錄。
範例 調用記錄
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }
您可以使用此資訊來從串流擷取受影響的記錄,以進行疑難排解。實際的記錄不包含在內,因此您必須處理此記錄,並在因過期而遺失之前從資料串流中擷取它們。
Amazon CloudWatch 指標
當您的函數處理完一個批次的記錄時,Lambda 會發出 IteratorAge
指標。指標指出處理完成時批次中最後一個記錄的存在時間。如果您的函數正在處理新的事件,可使用迭代器存留期來預估記錄新增與函數實際處理之間的延遲。
從迭代器存留期的增加趨勢可看出您函式的問題。如需詳細資訊,請參閱 使用 Lambda 函數指標。
時段
Lambda 函數可執行持續串流處理應用程式。串流表示持續在應用程式中流動的無限制資料。若要分析此持續更新輸入中的資訊,您可以使用定義的時段來限制包含的記錄。
輪轉時段是定期開啟和關閉的不同時段。依預設,Lambda 調用是無狀態的,您無法在沒有外部資料庫的情況下,將其用於處理多個持續調用的資料。然而,使用輪轉時段,您可以在不同的調用間維護狀態。此狀態包含之前為目前時段處理之訊息的彙總結果。狀態可以是每個分區最多 1 MB。如果超過該大小,則 Lambda 會提前終止時段。
串流中的每個記錄都屬於一個特定時段。Lambda 至少會處理一次每筆記錄,但不保證每筆記錄只會處理一次。在極少數情況下,例如錯誤處理,某些記錄可能會處理多次。第一次時一律會依序處理記錄。如果多次處理記錄,則可能不會按順序處理。
彙總與處理
調用您的使用者管理函數進行彙總,以及處理該彙總的最終結果。Lambda 會彙總時段中接收的所有記錄。您可以在多個批次中接收這些記錄,各自作為單獨的調用。每次調用會收到一個狀態。因此,當使用輪轉時段時,您的 Lambda 函數回應必須包含 state
屬性。如果回應不包含 state
屬性,Lambda 會將此視為失敗的調用。為了滿足此條件,您的函數可以返回一個 TimeWindowEventResponse
物件,它具有下列 JSON 形狀:
範例 TimeWindowEventResponse
值
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
注意
對於 Java 函數,我們建議使用 Map<String, String>
來表示狀態。
在時段結束時,標記 isFinalInvokeForWindow
會設定為 true
以指示這是最終狀態,並且可隨時進行處理。處理完成後,時段結束並完成最終調用,然後丟棄該狀態。
在時段結束時,Lambda 會針對彙總結果上的動作使用最終處理。您的最終處理將同步調用。成功調用後,您的函數檢查點序號和串流處理將會繼續。如果調用失敗,則您的 Lambda 函數會暫停進一步處理,直至成功調用。
範例 DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }
組態
您可以在建立或更新事件來源對映時設定輪轉時段。若要設定暫停視窗,請以秒為單位指定視窗 (TumblingWindowInSeconds)。下列範例 AWS Command Line Interface (AWS CLI) 命令會建立具有 120 秒暫停視窗的串流事件來源對應。針對彙總與處理定義的 Lambda 函數命名為 tumbling-window-example-function
。
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds
120
Lambda 根據記錄插入串流的時間,確定輪轉時段邊界。所有記錄都有 Lambda 在邊界確定中使用的近似時間戳記。
輪轉時段彙總不支援重新分區。分區結束後,Lambda 會考慮關閉時段,並且子分區會以新的狀態開始自己的時段。
輪轉時段完全支援現有的重試政策 maxRetryAttempts
和 maxRecordAge
。
範例 Handler.py - 彙總與處理
下列 Python 函數示範了如何彙總,然後處理您的最終狀態:
def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}
報告批次項目失敗
取用和處理事件來源的串流資料時,依預設,只有在批次成功完成時,Lambda 檢查點才會到批次的最高序號。Lambda 會將所有其他結果視為完全失敗,並重試處理批次,直至達到重試限制。若要在處理串流的批次時允許部分成功,請開啟 ReportBatchItemFailures
。允許部分成功有助於減少記錄的重試次數,但其不會完全消除在成功記錄中重試的可能性。
若要開啟ReportBatchItemFailures
,請在清單ReportBatchItemFailures
中包含FunctionResponseTypes列舉值。此清單指示已為您的函數啟用哪些回應類型。您可以在建立或更新事件來源對應時設定此清單。
報告語法
設定批次項目失敗的報告時,會傳回 StreamsEventResponse
類別,其中包含批次項目失敗的清單。您可以使用 StreamsEventResponse
物件,來傳回批次中第一個失敗記錄的序號。您還可以使用正確的回應語法,建立自己的自訂類別。下列 JSON 結構顯示所需的回應語法:
{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
注意
如果 batchItemFailures
陣列包含多個項目,則 Lambda 會使用具有最低序列號的記錄作為檢查點。然後,Lambda 會重試從該檢查點開始的所有記錄。
成功與失敗條件
如果您傳回下列任一項目,Lambda 會將批次視為完全成功:
-
空白
batchItemFailure
清單 -
Null
batchItemFailure
清單 -
空白
EventResponse
-
Null
EventResponse
如果您傳回下列任一項目,Lambda 會將批次視為完全失敗:
-
空白字串
itemIdentifier
-
Null
itemIdentifier
-
具有錯誤金鑰名稱的
itemIdentifier
Lambda 會根據您的重試政策來重試失敗。
將批次平分
如果您的調用失敗且 BisectBatchOnFunctionError
已開啟,則無論您的 ReportBatchItemFailures
設定如何,批次都會被平分。
收到部分批次成功回應且 BisectBatchOnFunctionError
和 ReportBatchItemFailures
均開啟時,批次會依傳回的序號進行平分,並且 Lambda 僅會重試剩餘的記錄。
以下範例函數程式碼會傳回批次中失敗訊息 ID 的清單:
Amazon DynamoDB Streams 組態參數
所有 Lambda 事件來源類型都共用相同CreateEventSourceMapping的 UpdateEventSourceMappingAPI 作業。但是,只有一些參數適用於 DynamoDB Streams。
適用於 DynamoDB Streams 的事件來源參數 | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
參數 | 必要 | 預設 | 備註 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BatchSize |
否 |
100 |
上限:10,000 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BisectBatchOnFunctionError |
N |
false |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DestinationConfig |
N |
捨棄記錄的標準 Amazon SQS 佇列或標準 Amazon SNS 主題目的地 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
已啟用 |
N |
true |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventSourceArn |
Y |
資料串流或串流消費者的 ARN |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FilterCriteria |
N |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FunctionName |
Y |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FunctionResponseTypes |
N |
若要讓函數報告批次中的特定失敗,請將值 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
MaximumBatchingWindowInSeconds |
N |
0 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
MaximumRecordAgeInSeconds |
N |
-1 |
-1 表示 infinite:失敗的記錄會重試,直到記錄過期為止。DynamoDB Streams 的資料保留限制為 24 小時。 下限:-1 上限:604,800 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
MaximumRetryAttempts |
N |
-1 |
-1 代表無限:系統會重試失敗的記錄,直到記錄過期為止 下限:0 上限:10,000 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ParallelizationFactor |
N |
1 |
上限:10 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
StartingPosition |
Y |
TRIM_HORIZON 或 LATEST |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TumblingWindowInSeconds |
N |
下限:0 上限:900 |