Amazon Kinesis 流作為管道的 EventBridge 來源 - Amazon EventBridge

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon Kinesis 流作為管道的 EventBridge 來源

您可以使用 EventBridge 管道接收 Kinesis 資料串流中的記錄。然後,您可以選擇性地篩選或增強這些記錄,然後再將它們傳送到可用的目的地之一進行處理。您可以在設定管道時選擇 Kinesis 特定的設定。 EventBridge 當將資料傳送至目的地時,管道會維護資料流中的記錄順序。

Kinesis 資料串流是一組碎片。每個碎片包含一系列的資料記錄。取用程式是處理來自 Kinesis 資料串流的資料的應用程式。您可以將 EventBridge Pipe 對應至共用輸送量取用者 (標準迭代器),或對應至具有增強型散發功能的專用輸送量取用者。

對於標準迭代器, EventBridge 使用HTTP通訊協定來輪詢 Kinesis 串流中的每個碎片以取得記錄。此函式會與碎片的其他消費者共用碎片讀取輸送量。

若要將延遲降至最低並最大化讀取輸送量,您可以建立具有增強散發功能的資料串流取用者。串流取用者會取得每個碎片的專用連線,其不會影響其他從串流讀取的應用程式。如果您有許多應用程式讀取相同的資料,或者,如果您要重新處理具有大量記錄的串流,則專屬的輸送量很有助益。Kinesis 將記錄推到 EventBridge /2. HTTP 如需關於 Kinesis 資料串流的資訊,請參閱從 Amazon Kinesis Data Streams 讀取資料

範例事件

下列範例事件顯示管道接收的資訊。您可以使用此事件來建立和篩選事件模式,或定義輸入轉換。並非所有欄位都可以篩選。如需有關所能篩選欄位的詳細資訊,請參閱 Amazon EventBridge 管道中的事件過濾

[ { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ]

輪詢和批次處理串流

EventBridge 輪詢會在 Kinesis 串流中以每秒四次的基本速率進行記錄碎片。當記錄可用時, EventBridge 處理事件並等待結果。如果處理成功,會 EventBridge 繼續輪詢,直到收到更多記錄為止。

根據預設,只要有記錄可用,就會 EventBridge 叫用管道。如果從來源 EventBridge 讀取的批次中只有一筆記錄,則只會處理一個事件。為避免處理少量記錄,您可讓管道設定批次間隔,要求管道緩衝記錄最長達五分鐘。在處理事件之前,會 EventBridge 繼續從來源讀取記錄,直到收集完整批次、批次處理期間到期或批次達到 6 MB 的有效負載限制為止。

您還可以透過並行處理來自每個碎片的多個批次來增加並行性。 EventBridge 可以同時在每個分片中處理多達 10 個批次。如果增加每個碎片的並行批次數, EventBridge 仍然可以確保在分區索引鍵層級進行按順序處理。

規劃 ParallelizationFactor 設定來同時處理 Kinesis 或 DynamoDB 資料串流的一個碎片與多個管道執行。您可以透過平行化因子從 1 (預設值) 到 10,指定從碎片 EventBridge 輪詢的並行批次數。例如,當您設定ParallelizationFactor為 2 時,最多可以有 200 個並行 EventBridge 管道執行,以處理 100 個 Kinesis 資料碎片。當資料量急劇波動並且 IteratorAge 高時,這有助於縱向擴展處理輸送量。請注意,如果您使用 Kinesis 彙總,則並行化因子將不起作用。

輪詢和串流開始位置

請注意,建立和更新管道期間的串流輪詢最終會一致。

  • 在建立管道期間,從串流開始輪詢事件可能需要幾分鐘時間。

  • 在更新管道資源輪訓組態期間,從串流停止並重新開始輪詢事件可能需要幾分鐘時間。

這表示如果您指定 LATEST 當作串流的開始位置,管道可能會在建立或更新期間遺漏事件。若要確保沒有遺漏任何事件,請將串流開始位置指定為 TRIM_HORIZONAT_TIMESTAMP

報告批次項目失敗

當使 EventBridge 用和處理來源的串流資料時,依預設,它會檢查點為批次的最高序號,但只有在批次完全成功時才會檢查點。若要避免重新處理失敗批次中成功處理過的訊息,您可以設定擴充或目標,傳回哪些訊息成功,哪些失敗的物件。我們將其稱為部分批次回應。

如需詳細資訊,請參閱 部分批次失敗

成功與失敗條件

如果您傳回下列任一項目,請 EventBridge 將批次視為完全成功:

  • 空白 batchItemFailure 清單

  • Null batchItemFailure 清單

  • 空白 EventResponse

  • Null EventResponse

如果您傳回下列任一項目,則 EventBridge 會將批次視為完全失敗:

  • 空白字串 itemIdentifier

  • Null itemIdentifier

  • 具有錯誤金鑰名稱的 itemIdentifier

EventBridge 根據您的重試策略重試失敗。