Amazon Kinesis Data Streams
開発者ガイド

Kinesis Client Library 1.x から 2.x への移行

このトピックでは、Kinesis Client Library (KCL) のバージョン 1.x と 2.x の違いについて説明します。また、コンシューマーを KCL のバージョン 1.x からバージョン 2.x に移行する方法も示します。クライアントを移行すると、最後にチェックポイントが作成された場所からレコードの処理が開始されます。

KCL のバージョン 2.0 では、以下のインターフェイスの変更が導入されています。

KCL インターフェイスの変更

KCL 1.x インターフェイス KCL 2.0 インターフェイス
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware software.amazon.kinesis.processor.ShardRecordProcessor 内に折りたたみ

レコードプロセッサの移行

以下の例は、KCL1.x に実装されたレコードプロセッサを示しています。

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }

レコードプロセッサのクラスを移行するには

  1. インターフェイスを com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor および com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware から software.amazon.kinesis.processor.ShardRecordProcessor に変更します。以下に例を示します。

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. initialize メソッド processRecords とメソッドの import ステートメントを更新します。

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. shutdown メソッドを以下の新しいメソッドに置き換えます。leaseLostshardEnded、および shutdownRequested

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

以下に示しているのは、レコードプロセッサのクラスの更新されたバージョンです。

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

レコードプロセッサファクトリーの移行

レコードプロセッサファクトリーは、リースが取得された際にレコードプロセッサの作成を担当します。以下に示しているのは、KCL 1.x ファクトリーの例です。

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }

レコードプロセッサファクトリーを移行するには

  1. 実装されているインターフェイスを com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory から software.amazon.kinesis.processor.RecordProcessorFactory に変更します。以下に例を示します。

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. createProcessr の戻り署名を変更します。

    // public IRecordProcessor createProcessor() { public RecordProcessor shardRecordProcessor() {

以下は、2.0 のレコードプロセッサファクトリーの例です。

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

Amazon Kinesis クライアントの設定

Kinesis Client Library のリリース 2.0 では、クライアントの設定が単一の設定クラス (KinesisClientLibConfiguration) から 6 つの設定クラスに移行されました。次の表で移行を説明します。

設定フィールドとその新しいクラス

元のフィールド 新しい設定クラス 説明
applicationName ConfigsBuilder この KCL アプリケーションの名前。tableName および consumerName のデフォルトとして使用されます。
tableName ConfigsBuilder Amazon DynamoDB リーステーブルで使用されるテーブル名の上書きを許可します。
streamName ConfigsBuilder このアプリケーションがレコードを処理するストリームの名前。
kinesisEndpoint ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
dynamoDBEndpoint ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
initialPositionInStream RetrievalConfig なし
kinesisCredentialsProvider ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
dynamoDBCredentialsProvider ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
cloudWatchCredentialsProvider ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
failoverTimeMillis LeaseManagementConfig リース所有者が失敗したとみなすまでの経過時間 (ミリ秒)。
workerIdentifier ConfigsBuilder このアプリケーションプロセッサのインスタンス化を表す一意の識別子。一意である必要があります。
shardSyncIntervalMillis LeaseManagementConfig シャード同期コールの間隔。
maxRecords PollingConfig Kinesis が返すレコードの最大数の設定を許可します。
idleTimeBetweenReadsInMillis CoordinatorConfig このオプションは削除されました。アイドル時間の削除を参照してください。
callProcessRecordsEvenForEmptyRecordList ProcessorConfig 設定すると、Kinesis から提供されたレコードがない場合でもレコードプロセッサが呼び出されます。
parentShardPollIntervalMillis CoordinatorConfig 親シャードが完了したかどうかを確認するためにレコードプロセッサがポーリングを行う頻度。
cleanupLeasesUponShardCompletion LeaseManagementConfig 設定すると、子リースの処理が開始されると即時にリースが削除されます。
ignoreUnexpectedChildShards LeaseManagementConfig 設定すると、開いているシャードがある子シャードは無視されます。これは、主に DynamoDB ストリーム 用です。
kinesisClientConfig ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
dynamoDBClientConfig ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
cloudWatchClientConfig ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
taskBackoffTimeMillis LifecycleConfig 失敗したタスクを再試行するまでの待機時間。
metricsBufferTimeMillis MetricsConfig CloudWatch メトリックスの発行を制御します。
metricsMaxQueueSize MetricsConfig CloudWatch メトリックスの発行を制御します。
metricsLevel MetricsConfig CloudWatch メトリックスの発行を制御します。
metricsEnabledDimensions MetricsConfig CloudWatch メトリックスの発行を制御します。
validateSequenceNumberBeforeCheckpointing CheckpointConfig このオプションは削除されました。チェックポイントシーケンス番号の検証を参照してください。
regionName ConfigsBuilder このオプションは削除されました。クライアント設定の削除を参照してください。
maxLeasesForWorker LeaseManagementConfig アプリケーションの単一のインスタンスが受け入れるリースの最大数。
maxLeasesToStealAtOneTime LeaseManagementConfig アプリケーションが同時にスティールを試みるリースの最大数。
initialLeaseTableReadCapacity LeaseManagementConfig Kinesis Client Library が新しい DynamoDB リーステーブルを作成する場合に使用する DynamoDB 読み取り IOPS。
initialLeaseTableWriteCapacity LeaseManagementConfig Kinesis Client Library が新しい DynamoDB リーステーブルを作成する場合に使用する DynamoDB 読み取り IOPS。
initialPositionInStreamExtended ConfigsBuilder アプリケーションが読み取りを開始するストリーム内の初期位置。これは最初のリースの作成時にのみ使用されます。
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig リーステーブルに既存のリースがある場合、シャードデータの同期を無効にします。TODO: KinesisEco-438
shardPrioritization CoordinatorConfig どのシャードの優先順位付けを使用するか。
shutdownGraceMillis 該当なし このオプションは削除されました。MultiLang の削除を参照してください。
timeoutInSeconds 該当なし このオプションは削除されました。MultiLang の削除を参照してください。
retryGetRecordsInSeconds PollingConfig GetRecords が失敗した場合の試行間隔の遅延時間を設定します。
maxGetRecordsThreadPool PollingConfig GetRecords に使用されるスレッドプールのサイズ。
maxLeaseRenewalThreads LeaseManagementConfig リース更新スレッドプールのサイズを制御します。アプリケーションが処理するリースの数が多いほど、このプールも大きくする必要があります。
recordsFetcherFactory PollingConfig ストリームから取得するフェッチャーを作成するために使用されるファクトリーの置換を許可します。
logWarningForTaskAfterMillis LifecycleConfig タスクが完了していない場合に警告がログに記録されるまでの待機期間。
listShardsBackoffTimeInMillis RetrievalConfig 障害が発生した場合に ListShards を呼び出す間隔 (ミリ秒)。
maxListShardsRetryAttempts RetrievalConfig 失敗とみなすまでの ListShards の再試行の最大回数。

アイドル時間の削除

KCL の 1.x バージョンでは、idleTimeBetweenReadsInMillis は 2 つの数量に相当します。

  • タスクの送信チェックの間隔。CoordinatorConfig#shardConsumerDispatchPollIntervalMillis を設定することで、タスク間の間隔を設定できるようになりました。

  • Kinesis Data Streams から返されるレコードがない場合に休止状態になるまでの時間。バージョン 2.0 では、拡張ファンアウトのレコードはそれぞれのレトリバーからプッシュされます。シャードコンシューマーのアクティビティは、プッシュされたリクエストが到着した場合にのみ発生します。

クライアント設定の削除

バージョン 2.0 では、KCL はクライアントを作成しなくなりました。有効なクライアントの提供はユーザーに任されます。この変更により、クライアントの作成を制御するすべての設定パラメータが削除されました。これらのパラメータが必要な場合は、クライアントを ConfigsBuilder に提供する前にクライアントで設定できます。

削除されたフィールド 同等の設定
kinesisEndpoint 優先エンドポイントを指定した SDK KinesisAsyncClient の設定: KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build().
dynamoDBEndpoint 優先エンドポイントを指定した SDK DynamoDbAsyncClient の設定: DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build().
kinesisClientConfig 必要な設定を指定した SDK KinesisAsyncClient の設定: KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()
dynamoDBClientConfig 必要な設定を指定した SDK DynamoDbAsyncClient の設定: DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()
cloudWatchClientConfig 必要な設定を指定した SDK CloudWatchAsyncClient の設定: CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()
regionName 優先リージョンを指定して SDK を設定します。これは、すべての SDK クライアントで同じです。たとえば、KinesisAsyncClient.builder().region(Region.US_WEST_2).build() と指定します。