使用 Kinesis Client Library - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Kinesis Client Library

開發自訂取用者應用程式以處理來自 KDS 資料串流之資料的其中一種方法,是使用 Kinesis Client Library (KCL)。

注意

對於 KCL 1.x 和 KCL 2.x,建議根據您的使用案例,升級至最新的 KCL 1.x 版或 KCL 2.x 版本。KCL 1.x 和 KCL 2.x 皆會定期更新為更新的版本,其中包含最新的相依性和安全修補程式、錯誤修正,以及向後相容的新功能。如需詳細資訊,請參閱 https://github.com/awslabs/ amazon-kinesis-client /版本

什麼是 Kinesis Client Library?

KCL 會處理與分散式運算相關的許多複雜任務,協助您取用和處理 Kinesis 資料串流中的資料。這其中包含跨多個取用者應用程式執行個體的負載平衡、對取用者應用程式執行個體失敗的回應、檢查點處理記錄,以及對重新分片的反應。KCL 會處理所有這些子任務,以便您可以專注在編寫自訂記錄處理邏輯上。

KCL 與 AWS SDK 中提供的 Kinesis Data Streams API 不同。Kinesis Data Streams API 可協助您管理 Kinesis Data Streams 的許多層面 (包括建立串流、重新分片、放入與取得記錄)。KCL 圍繞所有這些子任務提供了抽象層,特別是可讓您專注於取用者應用程式的自訂資料處理邏輯。如需 Kinesis Data Streams API 的相關資訊,請參閱 Amazon Kinesis API 參考

重要

KCL 是一種 Java 程式庫。Support Java 以外的語言的支援是使用稱為 MultiLangDaemon. 此常駐程式是以 Java 為基礎,並在您使用 Java 以外的 KCL 語言時在背景執行。例如,如果您安裝適用於 Python 的 KCL,並完全使用 Python 編寫您的消費者應用程式,您仍然需要在系統上安裝 Java,因為. MultiLangDaemon 此外, MultiLangDaemon 還有一些您可能需要針對您的使用案例自訂的預設設定,例如,它連線到的 AWS 區域。若要取得有關的詳細 MultiLangDaemon 資訊 GitHub,請參閱 KCL MultiLangDaemon 專案

KCL 在您的記錄處理邏輯與 Kinesis Data Streams 之間擔任媒介。KCL 將執行以下任務:

  • 連線到資料串流

  • 列舉資料串流內的碎片

  • 使用租用協調碎片與其工作者的關聯

  • 為其所管理的每個碎片執行個體化記錄處理器

  • 從資料串流提取資料記錄

  • 將記錄推送至對應的記錄處理器

  • 對已處理的記錄執行檢查點作業

  • 在工作者執行個體計數變更,或重新分割資料串流 (分割或合併碎片) 時,平衡碎片與工作者關聯 (租用)

KCL 可用版本

目前,您可以使用下列受支援的 KCL 版本之一,來建置自訂的取用者應用程式:

您可以使用 KCL 1.x 或 KCL 2.x 來建置使用共用輸送量的取用者應用程式。如需詳細資訊,請參閱 使用 KCL 開發具備共用傳輸量的自訂消費者

若要建置使用專用輸送量的取用者應用程式 (增強型散發取用者),您只能使用 KCL 2.x。如需詳細資訊,請參閱 開發具備專用傳輸量的自訂消費者 (強化廣發功能)

如需有關 KCL 1.x 和 KCL 2.x 之間差異的詳細資訊,以及如何從 KCL 1.x 遷移至 KCL 2.x 的指示,請參閱 將消費者從 KCL 1.x 遷移到 KCL 2.x

KCL 概念

  • KCL 取用者應用程式 – 使用 KCL 自訂建置的應用程式,專為讀取和處理資料串流中的記錄而設計。

  • 取用者應用程式執行個體 - KCL 取用者應用程式通常是分散式,可同時執行一個或多個應用程式執行個體,以在發生故障時進行協調並對資料記錄處理進行動態負載平衡。

  • 工作者 – KCL 取用者應用程式執行個體用來開始處理資料的高階類別。

    重要

    每個 KCL 取用者應用程式執行個體都有一個工作者。

    工作者會初始化並監督各種任務,包括同步處理碎片和租用資訊、追蹤碎片指派,以及處理來自碎片的資料。Worker 會為 KCL 提供取用者應用程式的組態資訊,例如資料記錄此 KCL 取用者應用程式將要處理的資料串流名稱,以及存取此資料串流所需的 AWS 認證。工作者也會啟動該特定 KCL 取用者應用程式執行個體,將資料記錄從資料串流傳送至記錄處理器。

    重要

    在 KCL 1.x 中,此類別被稱為工作者。有關更多信息,(這些是 Java KCL 存儲庫),請參閱 https://github.com/awslabs/ amazon-kinesis-client /BLOB/V1.x/SRC/主/java/COM /亞馬遜/服務/手動/客戶庫/庫/工作者/工人 .java。在 KCL 2.x.x 中,此類別被稱為排程器。排程器在 KCL 2.x 中的用途與 KCL 1.x 中的工作者的目的相同。如需有關 KCL 2.x 中的排程器類別的詳細資訊,請參閱 https://github.com/awslabs/ amazon-kinesis-client。amazon-kinesis-client /src/main/java/software/amazon/kinesis/coordinator/Scheduler.java

  • 租用 – 定義工作者與碎片之間繫結的資料。分散式 KCL 取用者應用程式使用租用來分割跨工作節點機群的資料記錄處理。在任何給定時間,每個資料記錄碎片都會透過 leaseKey 變數所識別的租用繫結至特定工作者。

    依預設,Worker 可同時持有一或多個租約 (視 maxLeasesForWorker 變數的值而定)。

    重要

    每個工作者都將爭奪保留資料串流中,所有可用碎片的所有可用租用。但是,只有一名工作者可以在任何時間成功持有每個租用。

    例如,如果您有一個含有工作者 A 的取用者應用程式執行個體 A 正在處理具有 4 個碎片的資料串流,則工作者 A 可以同時持有對碎片 1、2、3 和 4 的租用。但是,如果您有兩個取用者應用程式執行個體:A 和 B 具有工作者 A 和工作者 B,而且這些執行個體正在處理具有 4 個碎片的資料串流,則工作者 A 和工作者 B 無法同時持有對碎片 1 的租用。一個工作者會持有特定碎片的租用,直到準備好停止處理此碎片的資料記錄,或直到失敗為止。當一名工作者停止持有租用時,另一名工作者佔用並持有租用。

    欲了解更多信息,(這些都是 Java KCL 存儲庫),請參閱 https://github.com/awslabs/ amazon-kinesis-client /BLOB/V1.X/SRC /主/爪/COM /亞馬遜/服務/動力/租賃/實現/租賃.java 用於 KCL 1.x 和 https://github.com/awslabs/ /blob /主/ /src/main/java/software/amazon/kinesis/leases/Lease.java 用於 KCL 2.x。amazon-kinesis-client amazon-kinesis-client

  • 租用資料表 - 唯一的 Amazon DynamoDB 資料表,用於追蹤 KDS 資料串流中,由 KCL 取用者應用程式的工作者租用和處理的碎片。在 KCL 取用者應用程式執行時,租用資料表必須與資料串流中的最新碎片資訊保持同步 (在工作者內部和所有工作者之間)。如需詳細資訊,請參閱 使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片

  • 記錄處理器 – 定義 KCL 取用者應用程式如何處理從資料串流取得的資料的邏輯。在執行期,KCL 取用者應用程式執行個體會實體化工作者,而此工作者會針對其持有租用的每個碎片執行個體化一個記錄處理器。

使用租用資料表來追蹤 KCL 取用者應用程式處理的碎片

什麼是租用資料表

這對每個 Amazon Kinesis Data Streams 應用程式,KCL 會使用唯一的租用資料表 (存儲在 Amazon DynamoDB 資料表中),來追蹤 KDS 資料串流中由 KCL 取用者應用程式的工作者租用和處理的碎片。

重要

KCL 會使用取用者應用程式的名稱來建立此取用者應用程式所使用的租用資料表名稱,因此,每個取用者應用程式名稱都必須是唯一的。

您可以在取用者應用程式執行時使用 Amazon DynamoDB 主控台檢視其租用資料表。

如果應用程式啟動時,KCL 取用者應用程式的租用資料表不存在,其中一個工作者會建立此應用程式的租用資料表。

重要

您的帳戶除須支付 Kinesis Data Streams 本身的相關費用外,另將收取與 DynamoDB 資料表關聯的費用。

租用資料表內的每一列代表您的取用者應用程式的工作者所處理的某個碎片。如果您的 KCL 取用者應用程式僅處理一個資料串流,則租用資料表的雜湊索引鍵 leaseKey 就是碎片 ID。如果您是 使用相同的適用於 Java 的 KCL 2.x 取用者應用程式處理多個資料串流,則 leaseKey 的結構如下所示:account-id:StreamName:streamCreationTimestamp:ShardId。例如 111111111:multiStreamTest-1:12345:shardId-000000000336

除了碎片 ID 外,每一列還包含以下資料:

  • checkpoint:碎片的最新檢查點序號。資料串流中所有碎片的此值皆為獨一無二。

  • checkpointSubSequence數字:使用 Kinesis Producer 程式庫的彙總功能時,這是檢查點的擴充功能,可追蹤 Kinesis 記錄中的個別使用者記錄。

  • leaseCounter:用於租用版本控制,使工作者可偵測出其租用已由另一工作者接管。

  • leaseKey:租用的唯一識別符。每項租用特屬於資料串流中的某個碎片,一次由一個工作者所持有。

  • leaseOwner:持有此租用的工作者。

  • ownerSwitchesSince檢查點:自上次寫入檢查點以來,此租約已變更工作者的次數。

  • parentShardId:用於確保父碎片在子碎片上開始處理之前完全處理父碎片。這可確保按照記錄放入串流中的相同順序處理記錄。

  • hashrange:PeriodicShardSyncManager 用於執行週期性同步以尋找租用資料表中遺失的碎片,並在需要時為其建立租用。

    注意

    從 KCL 1.14 和 KCL 2.3 開始,每個碎片的租用資料表中都會顯示此資料。如需有關 PeriodicShardSyncManager 和租用與碎片之間的定期同步的詳細資訊,請參閱 租用資料表如何與 KDS 資料串流中的碎片同步處理

  • childshards:LeaseCleanupManager 用於檢閱子碎片的處理狀態,並決定是否可以從租用資料表中刪除父碎片。

    注意

    從 KCL 1.14 和 KCL 2.3 開始,每個碎片的租用資料表中都會顯示此資料。

  • shardID:碎片的 ID。

    注意

    如果您是 使用相同的適用於 Java 的 KCL 2.x 取用者應用程式處理多個資料串流,則此資料僅存在於租用資料表中。這僅在適用於 Java 的 KCL 2.x 中受支援 (從適用於 Java 的 KCL 2.3 及更高版本開始)。

  • 串流名稱資料串流的識別碼,格式如下:account-id:StreamName:streamCreationTimestamp

    注意

    如果您是 使用相同的適用於 Java 的 KCL 2.x 取用者應用程式處理多個資料串流,則此資料僅存在於租用資料表中。這僅在適用於 Java 的 KCL 2.x 中受支援 (從適用於 Java 的 KCL 2.3 及更高版本開始)。

輸送量

如果您的 Amazon Kinesis Data Streams 應用程式收到佈建輸送量例外狀況,則您即應提升 DynamoDB 資料表的佈建輸送量。KCL 建立的資料表其佈建輸送量為每秒 10 次讀取和每秒 10 次寫入,但這對您的應用程式而言可能不夠。例如,若您的 Amazon Kinesis Data Streams 經常執行檢查點作業或對由多個碎片構成的串流進行操作,您可能就需要更多的輸送量。

如需 DynamoDB 中佈建輸送量的相關資訊,請參閱《Amazon DynamoDB 開發人員指南》中的讀取/寫入容量模式使用資料表和資料

租用資料表如何與 KDS 資料串流中的碎片同步處理

KCL 取用者應用程式中的工作者會使用租用來處理來自指定資料串流的碎片。在任何給定時間,哪個工作者正在租用哪個碎片的資訊存儲在租用資料表中。在 KCL 取用者應用程式執行時,租用資料表必須與資料串流中的最新碎片資訊保持同步 。KCL 會在取用者應用程式啟動載入期間 (在取用者應用程式初始化或重新啟動時),以及每當正在處理的碎片到達結束 (重新分割) 時,將租用資料表與從 Kinesis Data Streams 服務取得的碎片資訊同步化。換句話說,工作者或 KCL 取用者應用程式會與它們在初始使用者應用程式啟動程序期間處理的資料串流,以及每當取用者應用程式遇到資料串流重新分片事件時,都會與其所處理的資料串流同步處理。

KCL 1.0-1.13 和 KCL 2.0-2.2 中的同步

在 KCL 1.0 - 1.13 和 KCL 2.0 - 2.2 中,在取用者應用程式的啟動載入期間以及每個資料串流重新分片事件期間,KCL 會透過調用 ListShardsDescribeStream 探索 API,將租用資料表與從 Kinesis 資料串流服務取得的碎片資訊同步。在上面列出的所有 KCL 版本中,KCL 取用者應用程式的每個工作者都會完成下列步驟,以便在取用者應用程式的啟動載入期間以及每個串流重新分片事件中執行租用/碎片同步處理程序:

  • 擷取正在處理的資料串流的所有碎片

  • 從租用資料表中擷取所有碎片租用

  • 篩選出租用資料表中沒有租用的每個開放碎片

  • 逐一查看所有找到的開放碎片以及每個沒有開放父級的開放碎片:

    • 透過其祖先路徑遍歷樹狀結構,以確定碎片是否為子代。如果正在處理祖系碎片 (租用資料表中存在祖系碎片的租用項目),或者應處理祖系碎片 (例如,如果初始位置為 TRIM_HORIZONAT_TIMESTAMP),則碎片即視為子代

    • 如果內容中的開放碎片是子代,KCL 會根據初始位置檢查碎片,並在必要時為其父項建立租用

KCL 2.x 中的同步,從 KCL 2.3 及更高版本開始

從 KCL 2.x (KCL 2.3) 及更新版本的最新支援版本開始,程式庫現在支援同步處理程序的以下變更。這些租用/碎片同步變更可大幅減少 KCL 取用者應用程式對 Kinesis Data Streams 服務進行的 API 呼叫次數,並最佳化 KCL 取用者應用程式中的租用管理。

  • 在應用程式的啟動載入期間,如果租用資料表是空的,則 KCL 會利用 ListShard API 的篩選選項 (ShardFilter 選用的請求參數) 來擷取和建立租用,僅用於在 ShardFilter 參數指定的時間開放的碎片快照。此 ShardFilter 參數可讓您篩選出 ListShards API 的回應。ShardFilter 參數的唯一必要屬性是 Type。KCL 會使用 Type 篩選屬性及其下列有效值來識別並傳回可能需要新租用之開啟碎片的快照:

    • AT_TRIM_HORIZON - 回應包括所有在 TRIM_HORIZON 打開的碎片。

    • AT_LATEST - 回應僅包含目前開放的資料串流碎片。

    • AT_TIMESTAMP - 回應包含開始時間戳記小於或等於指定時間戳記,且結束時間戳記大於或等於指定時間戳記或仍處於開放狀態的所有碎片。

    ShardFilter 用於為空租用資料表建立租用,以針對在 RetrievalConfig#initialPositionInStreamExtended 指定之碎片的快照初始化租用。

    如需 ShardFilter 的相關資訊,請參閱 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html

  • 單一當選的工作者領導者執行租用/碎片同步處理,而不是執行租用/碎片同步處理以使租用資料表與資料串流中的最新碎片保持最新狀態的所有工作者。

  • KCL 2.3 使用 ChildShards 傳回 GetRecordsSubscribeToShard API 的參數來執行在關閉碎片 SHARD_END 上發生的租用/碎片同步處理,允許 KCL 工作者只為其完成處理之碎片的子碎片建立租用。對於共用輸送量取用者應用程式,此租用/碎片同步處理的最佳化會使用 GetRecords API 的 ChildShards 參數。對於專用輸送量 (增強型散發) 取用者應用程式,此租用/碎片同步處理的最佳化會使用 SubscribeToShard API 的 ChildShards 參數。如需詳細資訊 GetRecords,請參閱SubscribeToShards、和ChildShard

  • 透過上述變更,KCL 的行為將從學習所有現有碎片的所有工作者的模型,轉移到僅學習每個工作者所擁有碎片的子碎片的工作者模型。因此,除了取用者應用程式啟動載入和重新分片事件期間發生的同步處理之外,KCL 現在還會執行額外的定期碎片/租用掃描,以識別租用資料表中的任何潛在漏洞 (換句話說,了解所有新碎片),以確保資料串流的完整雜湊範圍正在處理,並在需要時為其建立租用。PeriodicShardSyncManager 是負責執行定期租用/碎片掃描的元件。

    如需關於 PeriodicShardSyncManager KCL 2.3 的詳細資訊,請參閱 https://github.com/awslabs/ amazon-kinesis-client /blob /主/src/主/爪/軟體/亞馬遜/動力/租賃/ amazon-kinesis-client .java #L201-L213. LeaseManagementConfig

    在 KCL 2.3 中,新組態選項可用於在 LeaseManagementConfig 中設定 PeriodicShardSyncManager

    名稱 預設值 描述
    leasesRecoveryAuditorExecutionFrequencyMillis

    120000 (2 分鐘)

    稽核人員工作掃描租用資料表中部分租用的頻率 (以毫秒為單位)。如果稽核人員偵測到串流租用中的任何漏洞,則會根據 leasesRecoveryAuditorInconsistencyConfidenceThreshold 觸發碎片同步處理。

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    定期稽核人員工作的信賴閾值,用於確定租用資料表中資料串流的租用是否不一致。如果稽核人員多次連續發現同一組資料串流的不一致,則會觸發碎片同步處理。

    現在也會發出新的 CloudWatch 指標來監控. PeriodicShardSyncManager 如需詳細資訊,請參閱 PeriodicShardSyncManager

  • 包括 HierarchicalShardSyncer 的最佳化,以僅為一層碎片建立租用。

KCL 1.x 中的同步,從 KCL 1.14 及更高版本開始

從 KCL 1.x (KCL 1.14) 及更新版本的最新支援版本開始,程式庫現在支援同步處理程序的下列變更。這些租用/碎片同步變更可大幅減少 KCL 取用者應用程式對 Kinesis Data Streams 服務進行的 API 呼叫次數,並最佳化 KCL 取用者應用程式中的租用管理。

  • 在應用程式的啟動載入期間,如果租用資料表是空的,則 KCL 會利用 ListShard API 的篩選選項 (ShardFilter 選用的請求參數) 來擷取和建立租用,僅用於在 ShardFilter 參數指定的時間開放的碎片快照。此 ShardFilter 參數可讓您篩選出 ListShards API 的回應。ShardFilter 參數的唯一必要屬性是 Type。KCL 會使用 Type 篩選屬性及其下列有效值來識別並傳回可能需要新租用之開啟碎片的快照:

    • AT_TRIM_HORIZON - 回應包括所有在 TRIM_HORIZON 打開的碎片。

    • AT_LATEST - 回應僅包含目前開放的資料串流碎片。

    • AT_TIMESTAMP - 回應包含開始時間戳記小於或等於指定時間戳記,且結束時間戳記大於或等於指定時間戳記或仍處於開放狀態的所有碎片。

    ShardFilter 用於為空租用資料表建立租用,以針對在 KinesisClientLibConfiguration#initialPositionInStreamExtended 指定之碎片的快照初始化租用。

    如需 ShardFilter 的相關資訊,請參閱 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html

  • 單一當選的工作者領導者執行租用/碎片同步處理,而不是執行租用/碎片同步處理以使租用資料表與資料串流中的最新碎片保持最新狀態的所有工作者。

  • KCL 1.14 使用 ChildShards 傳回 GetRecordsSubscribeToShard API 的參數來執行在關閉碎片 SHARD_END 上發生的租用/碎片同步處理,允許 KCL 工作者只為其完成處理之碎片的子碎片建立租用。如需詳細資訊,請參閱GetRecordsChildShard

  • 透過上述變更,KCL 的行為將從學習所有現有碎片的所有工作者的模型,轉移到僅學習每個工作者所擁有碎片的子碎片的工作者模型。因此,除了取用者應用程式啟動載入和重新分片事件期間發生的同步處理之外,KCL 現在還會執行額外的定期碎片/租用掃描,以識別租用資料表中的任何潛在漏洞 (換句話說,了解所有新碎片),以確保資料串流的完整雜湊範圍正在處理,並在需要時為其建立租用。PeriodicShardSyncManager 是負責執行定期租用/碎片掃描的元件。

    KinesisClientLibConfiguration#shardSyncStrategyType 設定為 ShardSyncStrategyType.SHARD_END 時,PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold 用於確定包含租用資料表中漏洞的連續掃描數目臨界值,之後強制執行碎片同步化。當 KinesisClientLibConfiguration#shardSyncStrategyType 設定為 ShardSyncStrategyType.PERIODIC 時,會忽略 leasesRecoveryAuditorInconsistencyConfidenceThreshold

    有關 KCL 1.14 PeriodicShardSyncManager 中的更多信息,請參閱 https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src /主/java/COM /亞馬遜/服務/激勵/客戶庫/KinesisClientLibConfiguration庫/工人/. #L987

    在 KCL 1.14 中,新組態選項可用於在 LeaseManagementConfig 中設定 PeriodicShardSyncManager

    名稱 預設值 描述
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    定期稽核人員工作的信賴閾值,用於確定租用資料表中資料串流的租用是否不一致。如果稽核人員多次連續發現同一組資料串流的不一致,則會觸發碎片同步處理。

    現在也會發出新的 CloudWatch 指標來監控. PeriodicShardSyncManager 如需詳細資訊,請參閱 PeriodicShardSyncManager

  • KCL 1.14 現在也支援延遲租用清除。當碎片超過資料串流的保留期限或因重新分片操作而關閉時,達到 SHARD_END 時,LeaseCleanupManager 會以非同步方式刪除租用。

    新組態選項可用於設定 LeaseCleanupManager

    名稱 預設值 描述
    leaseCleanupInterval米利斯

    1 分鐘

    執行租用清除執行緒的間隔。

    completedLeaseCleanupIntervalMillis 5 分鐘

    檢查租用是否完成的間隔。

    garbageLeaseCleanupIntervalMillis 30 分鐘

    檢查租用是否為垃圾 (亦即超過資料串流保留期所進行的修剪) 的間隔。

  • 包括 KinesisShardSyncer 的最佳化,以僅為一層碎片建立租用。

使用相同的適用於 Java 的 KCL 2.x 取用者應用程式處理多個資料串流

本節說明下列 Java KCL 2.x 中的變更,這些變更可讓您建立可同時處理多個資料串流的 KCL 取用者應用程式。

重要

多串流處理僅在適用於 Java 的 KCL 2.x 中受支援,從適用於 Java 的 KCL 2.3 及更高版本開始。

對於可以實現 KCL 2.x 的任何其他語言,「不」支援多串流處理。

任何 KCL 1.x 版本均不支援多串流處理。

  • MultistreamTracker 接口

    若要建置可同時處理多個串流的消費者應用程式,您必須實作名為的新介面MultistreamTracker。此介面包含傳回資料串流清單及其組態的 streamConfigList 方法,以供 KCL 取用者應用程式處理。請注意,正在處理的資料串流可以在取用者應用程式執行期變更。KCL 會定期呼叫 streamConfigList,以瞭解要處理的資料串流變更。

    streamConfigList方法填充StreamConfig列表。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    請注意,StreamIdentifierInitialPositionInStreamExtended 是必填欄位,而 consumerArn 是選填欄位。只有在您使用 KCL 2.x 來實作增強型散發取用者應用程式時,才必須提供 consumerArn

    如需更多相關資訊StreamIdentifier,請參閱 https://github.com/awslabs/ amazon-kinesis-client /blob /v2.5.8/ /src/主要/爪/軟體/亞馬遜/動力/共用/ amazon-kinesis-client .java #L129. StreamIdentifier 若要建立StreamIdentifier,建議您從 v2.5.0 及更新版本中提供的streamArnstreamCreationEpoch和建立多串流執行個體。在不支援streamArm的 KCL v2.3 和 v2.4 中,請使用格式建立多串流執行個體。account-id:StreamName:streamCreationTimestamp從下一個主要版本開始,將不再支援此格式。

    MultistreamTracker 還包括刪除租用資料表 (formerStreamsLeasesDeletionStrategy) 中舊串流的租用的策略。請注意,在取用者應用程式執行期,無法變更策略。如需詳細資訊,請參閱 https://github.com/awslabs/ amazon-kinesis-client /blob/主/爪/軟體/亞馬遜/動力/處理amazon-kinesis-client器 FormerStreamsLeasesDeletionStrategy

  • ConfigsBuilder是應用程式範圍的類別,可用來指定建置 KCL 取用者應用程式時要使用的所有 KCL 2.x 組態設定。 ConfigsBuilder類現在有對MultistreamTracker接口的支持。您可以使用一個數據流的名稱進行初始化 ConfigsBuilder,以使用來自以下內容的記錄:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    或者,MultiStreamTracker如果要實現同時處理多個流的 KCL 消費者應用程序,則可以 ConfigsBuilder 使用初始化。

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • 針對 KCL 取用者應用程式實作多串流支援後,應用程式租用資料表的每一列現在都包含碎片 ID 和此應用程式所處理之多個資料串流的串流名稱。

  • 實作 KCL 取用者應用程式的多串流支援時,leaseKey 會採用下列結構:account-id:StreamName:streamCreationTimestamp:ShardId。例如 111111111:multiStreamTest-1:12345:shardId-000000000336

    重要

    如果您的現有 KCL 取用者應用程式設定僅處理一個資料串流,則 leaseKey (租用資料表的雜湊索引鍵) 就是碎片 ID。如果您重新設定這個現有的 KCL 取用者應用程式來處理多個資料串流,則它會中斷租用資料表,因為有了多重串流支援,leaseKey 結構必須如下所示:account-id:StreamName:StreamCreationTimestamp:ShardId

搭配 AWS Glue 結構描述登錄使用 Kinesis 用戶端程式庫

您可以將 Kinesis 資料串流與 AWS Glue 結構描述登錄整合。 AWS Glue 結構描述登錄可讓您集中探索、控制和發展結構描述,同時確保產生的資料會持續經過註冊的結構描述驗證。結構描述定義資料記錄的結構和格式。結構描述是可靠的資料發佈、耗用或儲存的版本化規格。 AWS Glue 結構描述登錄可讓您改善串流應用程式中的資 end-to-end 料品質和資料控管。如需詳細資訊,請參閱 AWS Glue 結構描述登錄檔。設定此整合的方法之一是透過 Java 中的 KCL。

重要

目前,Kinesis Data Streams 和 AWS Glue 結構描述登錄整合僅支援使用 Java 實作的 KCL 2.3 取用者的 Kinesis 資料串流。不提供多語言支援。不支援 KCL 1.0 取用者。不支援 KCL 2.3 之前的 KCL 2.x 取用者。

如需有關如何使用 KCL 設定 Kinesis Data Streams 與結構描述登錄整合的詳細指示,請參閱使用案例:將 Amazon Kinesis 資料串流與 Glue 結構描述登錄整合中的「使用 KPL/KCL 程式庫與資料互動」一節。 AWS