將消費者從 KCL 1.x 遷移到 KCL 2.x - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將消費者從 KCL 1.x 遷移到 KCL 2.x

此主題說明 Kinesis Client Library (KCL) 版本 1.x 和 2.x 之間的差異。她還說明如何將取用者從 KCL 的版本 1.x 遷移至版本 2.x。遷移用戶端後,該用戶端會從前一個檢查點的位置開始處理記錄。

2.0 版 KCL 引進了以下的介面變更:

KCL 介面變更
KCL 1.x 介面 KCL 2.0 介面
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware 整併至 software.amazon.kinesis.processor.ShardRecordProcessor

遷移記錄處理器

以下範例顯示基於 KCL 1.x 所實作的記錄處理器:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
遷移記錄處理器類別
  1. 將介面從 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware 更改為 software.amazon.kinesis.processor.ShardRecordProcessor,如下所示:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. 更新 importinitialize 方法的 processRecords 陳述式。

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. shutdown 方法取代為以下的新方法:leaseLostshardEndedshutdownRequested

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

記錄處理器類別經更新後的版本如下。

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

遷移記錄處理器處理站

記錄處理器處理站負責在取得租用時建立記錄處理器。以下是 KCL 1.x 處理站的範例。

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
遷移記錄處理器處理站
  1. 將實作的介面從 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory 更改為 software.amazon.kinesis.processor.ShardRecordProcessorFactory,如下所示。

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. 更改 createProcessor 的傳回簽章。

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

以下是 2.0 版記錄處理器處理站的範例:

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

遷移工作者

在 KCL 的版本 2.0,名為 Scheduler​ 的新類別會取代 ​Worker 類別。以下是 KCL 1.x 工作者的範例。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
遷移至工作者
  1. Worker 類別的 import 陳述式變更為 SchedulerConfigsBuilder 類別的匯入陳述式。

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. 建立 ConfigsBuilderScheduler,如下列範例所示:

    建議您在 KinesisAsyncClient 中,使用 KinesisClientUtil 來建立 KinesisAsyncClient 及設定 maxConcurrency

    重要

    Amazon Kinesis Client 可能會明顯發生延遲,除非您設定 KinesisAsyncClientmaxConcurrency 夠高,足以運作所有的租賃服務,並可額外使用 KinesisAsyncClient

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

設定 Amazon Kinesis 用戶端

隨著 2.0 版 Kinesis Client Library 的推出,用戶端組態已從單一組態類別 (KinesisClientLibConfiguration) 進展為六個組態類別。下表說明遷移情形。

組態欄位及其新類別
原始欄位 新的組態類別 描述
applicationName ConfigsBuilder 此 KCL 應用程式的名稱。用做為 tableNameconsumerName 的預設值。
tableName ConfigsBuilder 允許覆寫用於 Amazon DynamoDB 租用資料表的資料表名稱。
streamName ConfigsBuilder 此應用程式從中處理其記錄的串流名稱。
kinesisEndpoint ConfigsBuilder 此選項已經移除。請參閱「用戶端組態移除項目」一節。
dynamoDBEndpoint ConfigsBuilder 此選項已經移除。請參閱「用戶端組態移除項目」一節。
initialPositionInStreamExtended RetrievalConfig KCL 開始擷記錄所在碎片中的位置,透過應用程式初次執行開始。
kinesisCredentialsProvider ConfigsBuilder 此選項已經移除。請參閱「用戶端組態移除項目」一節。
dynamoDBCredentialsProvider ConfigsBuilder 此選項已經移除。請參閱「用戶端組態移除項目」一節。
cloudWatchCredentialsProvider ConfigsBuilder 此選項已經移除。請參閱「用戶端組態移除項目」一節。
failoverTimeMillis LeaseManagementConfig 將租用擁有者視為失敗前必須經過的毫秒數。
workerIdentifier ConfigsBuilder 代表應用程式處理器本項實例的唯一識別符。其必須獨一無二。
shardSyncIntervalMillis LeaseManagementConfig 碎片同步呼叫的時間。
maxRecords PollingConfig 允許設定 Kinesis 傳回的記錄數上限。
idleTimeBetweenReadsInMillis CoordinatorConfig 此選項已經移除。請參閱「閒置時間移除項目」一節。
callProcessRecordsEvenForEmptyRecordList ProcessorConfig 設定時,即使 Kinesis 無提供任何記錄也會呼叫記錄處理器。
parentShardPollIntervalMillis CoordinatorConfig 記錄處理器應該輪詢以檢查父碎片是否已完成的頻率。
cleanupLeasesUponShardCompletion LeaseManagementConfig 設定時,只要已開始處理子租用就會隨即移除租用。
ignoreUnexpectedChildShards LeaseManagementConfig 設定時,會忽略具有開放碎片的子碎片。此項主要用於 DynamoDB Streams。
kinesisClientConfig ConfigsBuilder 此選項已經移除。請參閱「用戶端組態移除項目」一節。
dynamoDBClientConfig ConfigsBuilder 此選項已經移除。請參閱「用戶端組態移除項目」一節。
cloudWatchClientConfig ConfigsBuilder 此選項已經移除。請參閱「用戶端組態移除項目」一節。
taskBackoffTimeMillis LifecycleConfig 重試失敗任務的等待時間。
metricsBufferTimeMillis MetricsConfig 控制 CloudWatch 指標發佈。
metricsMaxQueueSize MetricsConfig 控制 CloudWatch 指標發佈。
metricsLevel MetricsConfig 控制 CloudWatch 指標發佈。
metricsEnabledDimensions MetricsConfig 控制 CloudWatch 指標發佈。
validateSequenceNumberBeforeCheckpointing CheckpointConfig 此選項已經移除。請參閱「檢查點序號驗證」一節。
regionName ConfigsBuilder 此選項已經移除。請參閱「用戶端組態移除項目」一節。
maxLeasesForWorker LeaseManagementConfig 應用程式的單一執行個體應該接受的租用數上限。
maxLeasesToStealAtOneTime LeaseManagementConfig 應用程式一次應該嘗試挪用的租用數上限。
initialLeaseTableReadCapacity LeaseManagementConfig Kinesis Client Library 需要建立新的 DynamoDB 租用資料表時所使用的 DynamoDB 讀取 IOP。
initialLeaseTableWriteCapacity LeaseManagementConfig Kinesis Client Library 需要建立新的 DynamoDB 租用資料表時所使用的 DynamoDB 讀取 IOP。
initialPositionInStreamExtended LeaseManagementConfig 應用程式應該在串流中開始的初始位置。這僅在初次建立租用時使用。
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig 如果租用資料表包含現有的租用,即停用同步處理碎片資料。TODO:KinesisEco-438
shardPrioritization CoordinatorConfig 要使用哪些碎片優先順序
shutdownGraceMillis N/A 此選項已經移除。請參閱「MultiLang 移除項目」一節。
timeoutInSeconds N/A 此選項已經移除。請參閱「MultiLang 移除項目」一節。
retryGetRecordsInSeconds PollingConfig 為故障設定 GetRecords 嘗試間的延遲。
maxGetRecordsThreadPool PollingConfig 用於 GetRecords 的執行緒集區大小。
maxLeaseRenewalThreads LeaseManagementConfig 控制租用續約執行緒集區的大小。應用程式可容納的租用數愈多,此集區就應該愈大。
recordsFetcherFactory PollingConfig 允許將工廠進行替換,該工廠會用於建立擷取程式,而該擷取程式會從串流進行擷取。
logWarningForTaskAfterMillis LifecycleConfig 任務未完成的情況下要等待多久的時間才記錄警告。
listShardsBackoffTimeInMillis RetrievalConfig 呼叫 ListShards 發生錯誤時將等待的間隔毫秒數。
maxListShardsRetryAttempts RetrievalConfig ListShards 在放棄之前重試的次數上限。

閒置時間移除項目

1.x 版 KCL 的 idleTimeBetweenReadsInMillis 對應於兩種計量:

  • 任務分派檢查的間隔時間量。您現在可以透過設定 CoordinatorConfig#shardConsumerDispatchPollIntervalMillis,設定各任務的此一間隔時間。

  • 當 Kinesis Data Streams 未傳回任何記錄時將休眠的時間量。在 2.0 版中,具強化廣發功能的記錄是自其各自的擷取器推送。僅當推送的請求送達時,碎片消費者才會發生活動。

用戶端組態移除項目

在 2.0 版中,KCL 不再建立用戶端。其端賴使用者提供有效的用戶端。基於此項變更,所有控制用戶端建立的組態參數皆已移除。若您需要這類參數,可以先就用戶端進行所需設定再將用戶端提供予 ConfigsBuilder

已移除的欄位 等效組態
kinesisEndpoint 使用慣用的端點設定開發套件 KinesisAsyncClientKinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()
dynamoDBEndpoint 使用慣用的端點設定開發套件 DynamoDbAsyncClientDynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()
kinesisClientConfig 使用所需的組態設定開發套件 KinesisAsyncClientKinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()
dynamoDBClientConfig 使用所需的組態設定開發套件 DynamoDbAsyncClientDynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()
cloudWatchClientConfig 使用所需的組態設定開發套件 CloudWatchAsyncClientCloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()
regionName 使用慣用的區域設定開發套件。所有開發套件用戶端的做法皆相同。例如 KinesisAsyncClient.builder().region(Region.US_WEST_2).build()