本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Lambda 事件來源映射
注意
如果您想要將資料傳送到 Lambda 函數以外的目標,或在傳送資料之前豐富資料,請參閱 Amazon EventBridge 管道。
事件來源映射是一種 Lambda 資源,它可從事件來源中讀取,並調用 Lambda 函數。您可以使用事件來源映射,以在不會直接調用 Lambda 函數的服務中處理串流或佇列中的項目。本頁說明 Lambda 提供事件來源映射的服務,以及如何微調批次處理行為。
Lambda 從中讀取事件的服務
事件來源映射會使用函式的執行角色許可來讀取和管理事件來源中的項目。許可、事件結構、設定和輪詢行為因事件來源而異。如需詳細資訊,請參閱您做為事件來源使用之服務的連結主題。
若要使用 AWS Command Line Interface (AWS CLI) 或 AWS SDK
警告
Lambda 事件來源對應至少處理每個事件一次,而且可能會重複處理批次。為了避免與重複事件相關的潛在問題,我們強烈建議您將函數代碼設為冪等。若要深入了解,請參閱 AWS 知識中心如何讓 Lambda 函數具有冪等性
建立事件來源對應
若要在事件來源和 Lambda 函數之間建立對應,請在主控台中建立觸發器或使用create-event-source-mapping
若要新增權限並建立觸發器
若要建立事件來源對應 (AWS CLI)
下列範例會使用 AWS CLI 名為的函數對應my-function
至其 Amazon 資源名稱 (ARN) 指定的 DynamoDB 串流,批次大小為 500。
aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --maximum-batching-window-in-seconds 5 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:
us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525
您應該會看到下列輸出:
{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 5, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }
更新事件來源對應
更新事件來源對應 (主控台)
開啟 Lambda 主控台中的 函數頁面
。 -
選擇一個函數。
-
選擇組態,然後選擇「觸發程式」
-
選取觸發器,然後選擇 [編輯]。
更新事件來源對應 (AWS CLI)
使用 update-event-source-mapping
aws lambda update-event-source-mapping \ --uuid
"a1b2c3d4-5678-90ab-cdef-11111EXAMPLE"
\ --scaling-config'{"MaximumConcurrency":5}'
刪除事件來源對應
刪除函數時,Lambda 不會刪除關聯的事件來源對應。您可以在主控台或使用 DeleteEventSourceMappingAPI 動作刪除事件來源對應。
若要刪除事件來源對應 (主控台)
-
開啟 Lambda 主控台的事件來源對應頁面
。 -
選取您要刪除的事件來源對映。
-
在 [刪除事件來源對應] 對話方塊中,輸入 delete,然後選擇 [刪除]。
若要刪除事件來源對應 (AWS CLI)
使用 delete-event-source-mapping
aws lambda delete-event-source-mapping \ --uuid
a1b2c3d4-5678-90ab-cdef-11111EXAMPLE
批次處理行為
事件來源映射從目標事件來源讀取項目。根據預設,事件來源映射會將記錄批次處理到 Lambda 傳送給函數的單個承載中。要微調批次處理行為,您可以設定一個批次間隔 (MaximumBatchingWindowInSeconds
) 和批次大小 (BatchSize
)。批次間隔是將記錄收集到單一承載的最長時間。批次大小是單一批次中記錄的最大數目。當下列三個條件之一成立時,Lambda 會調用您的函數:
-
批次間隔達到其最大值。預設的批次處理視窗行為會根據特定的事件來源而有所不同。
對於 Kinesis、DynamoDB 和 Amazon SQS 事件來源:預設批次間隔為 0 秒。這意味著 Lambda 只有在滿足批次大小或達到有效負載大小限制時,才會將批次傳送至您的函數。若要設定批次化視窗,請進行設定
MaximumBatchingWindowInSeconds
。您可以將此參數設定為 0 到 300 秒之間的任何值,以 1 秒為增量。如果您設定批次處理視窗,則下一個視窗會在前一個函數呼叫完成後立即開始。如果事件來源是 Amazon MSK、自主管理 Apache Kafka 、Amazon MQ 以及 Amazon DocumentDB:預設批次處理時段為 500 毫秒。您可以將
MaximumBatchingWindowInSeconds
設定為從 0 秒到 300 秒之間的任意值,增量為秒。一旦第一條記錄到達,批次間隔就會開始。注意
因為您只能變更以秒為增量的
MaximumBatchingWindowInSeconds
,所以在變更之後無法恢復到 500 毫秒的預設批次間隔。要恢復預設批次間隔,必須建立新的事件來源映射。
-
已滿足批次大小。批次大小下限為 1。預設和最大批次大小取決於事件來源。如需這些值的詳細資訊,請參閱
CreateEventSourceMapping
API 操作的BatchSize
規格。 -
承載大小達到 6 MB。您無法修改此限制。
下圖說明了這三種情況。假設批次間隔從第 t = 7
秒開始。在第一種情況下,批次間隔在累積 5 條記錄後在第 t = 47
秒達到其最大值 40 秒。在第二種情況下,批次大小在批次間隔到期之前達到 10,因此批次間隔提前結束。在第三種情況下,承載大小在批次間隔到期之前達到最大值,因此批次間隔提前結束。
以下範例顯示從 Kinesis 串流進行讀取的事件來源映射。如果事件批次的所有處理嘗試皆失敗,事件來源映射會將批次的詳細資訊傳送到 SQS 佇列。
事件批次是 Lambda 傳送到函數的事件。它是從事件來源映射讀取的項目中編譯的一批記錄或訊息,直到當前批次間隔過期為止。
對於 Kinesis 和 DynamoDB 串流,事件來源映射會針對串流中的每個碎片建立迭代器,並依序處理每個碎片中的項目。您可以將事件來源映射設定為只讀取串流中出現的新項目,或從較舊的項目開始讀取。已處理的項目不會從串流中移除,且其他函數或取用者可以處理這些項目。
Lambda 在傳送下一批進行處理前不會等待任何已設定的 Lambda 延伸 完成。換句話說,當 Lambda 處理下一批記錄時,您的擴充功能可能會繼續執行。如果您違反任何帳戶的 並行 設定或限制,便可能會產生限流的問題。若要偵測此是否為潛在問題,請監控您的函數,並確認您看到的 並行指標 是否高於事件來源映射的預期值。由於兩次調用之間的時間很短,Lambda 可能會短暫報告比碎片數目更高的並行用量。即使對於沒有延伸項目的 Lambda 函數也可能如此。
根據預設,如果您的函數傳回錯誤,則事件來源映射會重新處理整個批次,直到函數成功,或批次中的項目過期為止。若要確保依序處理,事件來源映射會暫停處理受影響的碎片,直到錯誤解決為止。您可以設定事件來源映射,捨棄舊的事件,或是平行處理多個批次。如果您以並行方式處理多個批次,則仍然會保證每個分割區索引鍵的依序處理,但事件來源映射會在相同碎片中同時處理多個分割區索引鍵。
針對串流來源 (DynamoDB 和 Kinesis),您可以設定函數傳回錯誤時 Lambda 重試的次數上限。導致批次未抵達函數的服務錯誤或限流不會計入重試嘗試次數。
您也可以設定事件來源映射,在捨棄事件批次時將調用記錄傳送至另一個服務。Lambda 對事件來源映射支援以下目的地。
-
Amazon SQS— SQS 佇列。
-
Amazon SNS - SNS 主題。
呼叫記錄包含以 JSON 格式呈現的失敗事件批次詳細資訊。
下列範例顯示 Kinesis 串流的調用記錄。
範例 呼叫記錄
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }
Lambda 也支援 FIFO (先進先出) 佇列的循序處理,並可以向上擴展至作用中訊息群組的數量。對於標準佇列,項目不一定要依序處理。Lambda 會向上擴展,以盡快處理標準佇列。發生錯誤時,Lambda 會以個別項目的形式將批次傳回佇列,並可以透過與原始批次不同的分組方式來處理。有時候,即使沒有發生函數錯誤,事件來源映射也可能收到佇列中相同的項目兩次。Lambda 會在成功處理項目後,從佇列中刪除它們。您可以設定來源佇列,以將項目在 Lambda 無法處理時傳送至無效字母佇列或目的地。
如需有關直接調用 Lambda 函數的服務詳細資訊,請參閱 AWS Lambda 搭配其他服務使用。
設定事件來源映射調用的目的地
若要保留失敗的事件來源映射調用記錄,請將目標地新增到函數的事件來源映射中。僅支援 Kinesis、DynamoDB 和 Kafka 型事件來源的事件來源映射調用設定目的地。傳送至目的地的每筆記錄都是包含調用之詳細資訊的 JSON 文件。與錯誤處理設定相似,您可以設定函數、函數版本或是別名的目標。
注意
對於事件來源映射調用,您只能保留失敗調用的記錄。對於其他非同步調用,您可以保留成功和失敗調用的記錄。如需詳細資訊,請參閱 設定非同步調用的目的地。
您可以將任何 Amazon SNS 主題或任何 Amazon SQS 佇列設定為目的地。對於這些目的地類型,Lambda 會將記錄中繼資料傳送至目的地。僅對於以 Kafka 為基礎的事件來源,您也可以選擇 Amazon S3 儲存貯體做為目的地。如果您指定 S3 儲存貯體,Lambda 會將整個調用記錄與中繼資料一起傳送至目的地。
以下的資料表摘要出說明事件來源映射調用所支援的目的地類型。若要讓 Lambda 成功將記錄傳送至您選擇的目的地,請確定函數的執行角色也包含相關許可。此資料表也說明每個目的地類型如何接收 JSON 調用記錄。
目的地類型 | 支援下列事件來源 | 所需的許可 | 目的地特定 JSON 格式 |
---|---|---|---|
Amazon SQS 佇列 |
|
Lambda 會將調用記錄中繼資料做為 |
|
Amazon SNS 主題 |
|
Lambda 會將調用記錄中繼資料做為 |
|
Amazon S3 儲存貯體 |
|
Lambda 會將調用記錄及其中繼資料儲存在目的地。 |
下列範例顯示 Lambda 針對失敗的 Kinesis 事件來源調用而傳送至 SQS 佇列或 SNS 主題。由於 Lambda 只會傳送這些目標類型的中繼資料,因此請使用streamArn
、shardId
、startSequenceNumber
和 endSequenceNumber
欄位來取得完整的原始記錄。
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }
如需 DynamoDB 事件來源的範例,請參閱 錯誤處理。如需 Kafka 事件來源的範例,請參閱自我管理 Apache Kafka 的失敗時目的地,或 Amazon MSK 的失敗時目的地。