本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
處理重複記錄
有兩個主要原因可能會導致多次將記錄交付至您的 Amazon Kinesis Data Streams:生產者重試和取用者重試。您的應用程式必須預料並妥善因應多次處理個別記錄的問題。
製作人重試
試想有一個生產者,其呼叫 PutRecord
後但仍未能收到 Amazon Kinesis Data Streams 的確認便遇到了與網路相關的逾時情況。此生產者無法確定記錄是否已交付至 Kinesis Data Streams。假設每一筆記錄對應用程式都很重要,生產者即會撰寫成使用相同的資料重試呼叫。如果就相同的資料呼叫兩次 PutRecord
均已成功遞交至 Kinesis Data Streams,則將會有兩筆 Kinesis Data Streams 記錄。儘管這兩筆記錄具有相同的資料,但其序號各不相同。需要嚴格保證的應用程式應於記錄中嵌入主索引鍵,以便稍後進行處理時移除重複項目。請注意,因生產者重試而造成的重複項目數通常會比因消費者重試而造成的重複項目數來得少。
注意
如果您使用 AWS SDKPutRecord
,請參閱AWS SDKs和工具使用指南中的「SDK重試」行為。
消費者重試
消費者 (資料處理應用程式) 重試是在記錄處理器重新啟動時發生。同一碎片的記錄處理器將於以下情況重新啟動:
-
工作者意外終止
-
新增或移除工作者執行個體
-
碎片合併或分割
-
部署應用程式
在所有這些情況下, shards-to-worker-to-record-處理器對應不期更新為負載平衡處理。已遷移至其他執行個體的碎片處理器將從最後一個檢查點重新啟動處理記錄。這會導致重複的記錄處理,如以下範例所示。如需負載平衡的詳細資訊,請參閱使用重新碎片、縮放和 parallel 處理來變更碎片數量。
範例:取用者重試導致重新傳遞記錄
本範例中的應用程式將持續從串流讀取記錄、彙整記錄至本機檔案,然後上傳該檔案到 Amazon S3。為求簡化,假設只有 1 個碎片並由 1 個工作者處理此碎片。試想以下發生的一系列範例事件,假設最後一個檢查點位於記錄編號 10000 處:
-
工作者從碎片讀取下一批次的記錄,即記錄 10001 到 20000。
-
工作者隨後將該批次記錄傳遞至關聯的記錄處理器。
-
記錄處理器彙總資料、建立 Amazon S3 檔案並成功將該檔案上傳到 Amazon S3。
-
工作者在新的檢查點到達之前意外終止。
-
應用程式、工作者和記錄處理器重新啟動。
-
工作者現在開始從上次成功的檢查點 (本例中為 10001) 進行讀取。
因此,記錄 10001-20000 取用了一次以上。
對消費者重試有彈性
即使記錄可能經過多次處理,您的應用程式也許希望能體現副作用,猶如只處理一次記錄那樣 (等冪處理)。此問題的解決方法因複雜度與準確度而異。如果最終資料的目的地能夠妥善處理重複項目,建議您憑藉最終目的地以實現等冪處理。例如,使用 Opensearch
回顧前一節的範例應用程式,其持續從串流讀取記錄、彙整記錄至本機檔案,然後上傳該檔案到 Amazon S3。如該節所示,記錄 10001-20000 取用了一次以上,導致多個 Amazon S3 檔案具有相同的資料。化解該範例發生重複情況的一種方法是確保步驟 3 使用以下機制:
-
記錄處理器就每個 Amazon S3 檔案使用固定數目的記錄,例如 5000。
-
檔案名稱使用以下結構描述:Amazon S3 字首、碎片 ID 和
First-Sequence-Num
。在本例中,其可能類似於sample-shard000001-10001
。 -
上傳 Amazon S3 檔案後,透過指定
Last-Sequence-Num
執行檢查點作業。本例應於記錄編號 15000 處執行檢查點作業。
利用上述機制,即使記錄經過多次處理,產生的 Amazon S3 檔案也會具有相同的名稱和相同的資料。重試只會導致多次將相同的資料寫入同一個檔案。
若發生重新分片操作的情況,留存於碎片中的記錄數目可能少於您所需要的固定數目。就本例而言,您必須透過 shutdown()
方法將檔案排清至 Amazon S3 並對最後一個序號執行檢查點作業。上述機制同樣相容於重新分片操作。