Lambda 如何處理來自 Amazon Kinesis Data Streams 的記錄 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Lambda 如何處理來自 Amazon Kinesis Data Streams 的記錄

您可以使用 Lambda 函數來處理 Amazon Kinesis 資料串流中的記錄。您可以將 Lambda 函數對應至 Kinesis Data Streams 共用輸送量取用者 (標準迭代器),或對應至具有增強型散發功能的專用輸送量取用者。對於標準迭代器,Lambda 會使用通訊協定輪詢 Kinesis 串流中的每個碎片,以取得記錄。HTTP事件來源映射會與碎片的其他取用者共用讀取傳輸量。

如需 Kinesis 資料串流的詳細資訊,請參閱從 Amazon Kinesis Data Streams 讀取資料

注意

Kinesis 會收取每個碎片的費用,以及從串流讀取資料的增強型散發。如需定價的詳細資訊,請參閱 Amazon Kinesis 定價

輪詢和批次處理串流

Lambda 會從資料串流讀取記錄並透過包含串流記錄的事件同步調用函數。Lambda 會讀取批次中的記錄並調用函數,以處理來自該批次的記錄。每個批次包含來自單一碎片/資料串流的記錄。

對於標準 Kinesis 資料串流,Lambda 會針對每個碎片以每秒一次的速率輪詢串流中的記錄碎片。對於 Kinesis 增強型散發功能,Lambda 會使用 HTTP /2 連線偵聽從 Kinesis 推送的記錄。當記錄可用時,Lambda 會調用您的函數,並等待結果。

Lambda 預設會在記錄可用時立即調用函數。如果 Lambda 從事件來源中讀取的批次只有一筆記錄,Lambda 只會傳送一筆記錄至函數。為避免調用具有少量記錄的函數,您可設定批次間隔,請求事件來源緩衝記錄最長達五分鐘。調用函數之前,Lambda 會繼續從事件來源中讀取記錄,直到收集到完整批次、批次間隔到期或者批次達到 6 MB 的承載限制。如需詳細資訊,請參閱批次處理行為

警告

Lambda 事件來源對應至少處理每個事件一次,並且可能會重複處理記錄。為了避免與重複事件相關的潛在問題,我們強烈建議您將函數代碼設為冪等。若要深入了解,請參閱 AWS 知識中心如何讓 Lambda 函數具有冪等性

在傳送下一個批次進行處理之前,Lambda 不會等待任何已設定的擴充功能完成。換句話說,當 Lambda 處理下一批記錄時,您的擴充功能可能會繼續執行。如果您違反任何帳戶的 並行 設定或限制,便可能會產生限流的問題。若要偵測此是否為潛在問題,請監控您的函數,並確認您看到的 並行指標 是否高於事件來源映射的預期值。由於兩次調用之間的時間很短,Lambda 可能會短暫報告比碎片數目更高的並行用量。即使對於沒有延伸項目的 Lambda 函數也可能如此。

ParallelizationFactor設定此設定以同時處理具有多個 Lambda 叫用的 Kinesis 資料串流碎片。您可以透過從 1 (預設) 到 10 的並行化因子指定 Lambda 從碎片輪詢的並行批次數。例如,當您設定ParallelizationFactor為 2 時,最多可以有 200 個並行 Lambda 叫用來處理 100 個 Kinesis 資料碎片 (雖然在實務中,您可能會看到指標的不同值)。ConcurrentExecutions當資料量急劇波動並且 IteratorAge 高時,這有助於縱向擴展處理輸送量。當您增加每個碎片的並行批次數時,Lambda 仍可確保在磁碟分割索引鍵層級進行順序處理。

您也可以ParallelizationFactor搭配 Kinesis 彙總使用。事件來源對應的行為取決於您是否使用增強型散發:

  • 沒有增強型散發:聚合事件中的所有事件都必須具有相同的分區索引鍵。分割索引鍵也必須與彙總事件的索引鍵相符。如果彙總事件內的事件具有不同的分區索引鍵,Lambda 無法保證按照分區索引鍵按順序處理事件。

  • 透過增強型散發:首先,Lambda 會將彙總的事件解碼為其個別事件。聚集的事件可以具有與其包含的事件不同的分區索引鍵。不過,與磁碟分割索引鍵不對應的事件會遭到捨棄並遺失。Lambda 不會處理這些事件,也不會將它們傳送到設定的故障目的地。

範例事件

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }