Kinesis クライアントライブラリの使用 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis クライアントライブラリの使用

KDS データストリームからデータを処理できるカスタムコンシューマーアプリケーションを開発する方法の 1 つは、Kinesis クライアントライブラリ (KCL) を使用する方法です。

Kinesis クライアントライブラリとは

KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesis データストリームからデータを消費および処理するのに役立ちます。これには、複数のコンシューマーアプリケーションインスタンス間での負荷分散、コンシューマーアプリケーションインスタンスの障害に対する応答、処理済みのレコードのチェックポイント作成、リシャーディングへの対応が挙げられます。KCL はこれらのサブタスクをすべて処理するため、カスタムレコード処理ロジックの作成に集中できます。

KCL は、KKinesis Data Streams API とは異なります。AWSSDK. Kinesis Data Streams API は、ストリームの作成、リシャーディング、レコードの入力と取得など、Kinesis Data Streams ストリームの多くの要素を管理するのに役立ちます。KCL は、これらすべてのサブタスクの抽象化レイヤーを提供します。特に、コンシューマーアプリケーションのカスタムデータ処理ロジックに集中できるようにします。Kinesis Data Streams API の詳細については、Amazon Kinesis API リファレンス

重要

KCL は Java ライブラリです。Java以外の言語のSupport は、MultiLangDaemonと呼ばれる多言語インタフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用している場合、バックグラウンドで実行されます。たとえば、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、ユースケースに合わせてカスタマイズする必要のあるデフォルト設定例があります。たとえば、AWS接続先のリージョンです。GitHub の MultiLangDaemon の詳細については、KCL MultiLangDaemon

KCL は、レコード処理ロジックと KKinesis Data Streams 仲介として機能します。KCL は、次のタスクを実行します。

  • データストリームに接続する

  • データストリーム内のシャードを列挙します

  • リースを使用して、シャードとそのワーカーとの関連付けを調整します

  • レコードプロセッサで管理する各シャードのレコードプロセッサをインスタンス化する

  • データストリームからデータレコードを取得する

  • 対応するレコードプロセッサにレコードを送信する

  • 処理されたレコードのチェックポイントを作成する

  • ワーカーインスタンス数が変更されたとき、またはデータストリームが再変更されたとき(シャードが分割またはマージされた場合)、シャードワーカー関連(リース)のバランスをとる

KCL 使用できるバージョン

現在、以下のサポートされているバージョンの KCL のいずれかを使用して、カスタムコンシューマーアプリケーションを構築できます。

KCL 1.x または KCL 2.x のいずれかを使用して、共有スループットを使用するコンシューマーアプリケーションを構築できます。詳細については、「KCL を使用したスループット共有カスタムコンシューマーの開発」を参照してください。

専用スループット (拡張ファンアウトコンシューマー) を使用するコンシューマーアプリケーションを構築するには、KCL 2.x のみを使用できます。詳細については、「スループット専有 (拡張ファンアウト) カスタムコンシューマーの開発」を参照してください。

KCL 1.x と KCL 2.x の違い、および KCL 1.x から KCL 2.x への移行方法については、」コンシューマーを KCL 1.x から KCL 2.x に移行する

KCL 概念

  • KCL コンシューマーアプリケーション— KCL を使用してカスタムビルドされ、データストリームからレコードを取得して処理するように設計されたアプリケーションです。

  • コンシューマ・アプリケーション・インスタンス-KCL コンシューマーアプリケーションは、通常、分散され、1 つ以上のアプリケーションインスタンスが同時に実行され、障害を調整し、データレコード処理を動的に負荷分散します。

  • ワーカー— KCL コンシューマーアプリケーションインスタンスがデータの処理を開始するために使用する高レベルのクラスです。

    重要

    各 KCL コンシューマーアプリケーションインスタンスには、1 つのワーカーがあります。

    ワーカーは、シャードとリース情報の同期、シャード割り当ての追跡、シャードからのデータの処理など、さまざまなタスクを初期化し、監督します。ワーカーは、この KCL コンシューマーアプリケーションが処理するデータを記録するデータストリームの名前など、コンシューマーアプリケーションの構成情報を KCL に提供します。AWSこのデータストリームにアクセスするために必要な認証情報。ワーカーはまた、特定の KCL コンシューマーアプリケーションインスタンスを起動して、データストリームからレコードプロセッサにデータレコードを配信します。

    重要

    KCL 1.xでは、このクラスはワーカー。詳細については、(これらは Java KCL リポジトリです)、https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java。KCL 2.x では、このクラスはスケジューラー。KCL 2.x でのスケジューラの目的は、KCL 1.x でのワーカーの目的と同じです。KCL 2.x のスケジューラクラスの詳細については、」https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java

  • リース— ワーカーとシャード間のバインディングを定義するデータ。分散型 KCL コンシューマーアプリケーションは、リースを使用して、複数のワーカーにまたがってデータレコード処理を分割します。データレコードの各シャードは、いつでも特定のワーカーにバインドされ、leaseKey変数.

    デフォルトでは、ワーカーは 1 つ以上のリースを保持できます (マックスリースフォーワーカー変数) を同時にできます。

    重要

    すべてのワーカーは、データストリーム内の利用可能なすべてのシャードの利用可能なすべてのリースを保持するために競合します。しかし、一度に各リースを正常に保持できるワーカーは 1 人だけです。

    たとえば、4 つのシャードを持つデータストリームを処理しているワーカー A を持つコンシューマーアプリケーションインスタンス A がある場合、ワーカー A はシャード 1、2、3、4 のリースを同時に保持できます。しかし、コンシューマーアプリケーションインスタンスが2つある場合:A と B にワーカー A とワーカー B があり、これらのインスタンスは 4 つのシャードを持つデータストリームを処理しています。ワーカー A とワーカー B は両方ともシャード 1 へのリースを同時に保持できません。あるワーカーは、このシャードのデータレコードの処理を停止する準備が整うまで、または失敗するまで、特定のシャードへのリースを保持します。あるワーカーがリースの保持を停止すると、別のワーカーがリースを取得して保持します。

    詳細については、(これらは Java KCL リポジトリです)、https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.javaおよび KCL 1.x およびhttps://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.javaKCL 2.x 用

  • リースのテーブル-KCL コンシューマーアプリケーションのワーカーによってリースおよび処理される KDS データストリーム内のシャードを追跡するために使用される一意の Amazon DynamoDB テーブル。リーステーブルは、KCL コンシューマーアプリケーションの実行中に、データストリームの最新のシャード情報と (ワーカー内およびすべてのワーカー間で) 同期したままにする必要があります。詳細については、「リーステーブルを使用した KCL コンシューマーアプリケーションによって処理されるシャードの追跡」を参照してください。

  • レコードプロセッサ— KCL コンシューマーアプリケーションがデータストリームから取得したデータをどのように処理するかを定義するロジック。実行時に、KCL コンシューマーアプリケーションインスタンスはワーカーをインスタンス化し、このワーカーはリースを保持するシャードごとに 1 つのレコードプロセッサをインスタンス化します。

リーステーブルを使用した KCL コンシューマーアプリケーションによって処理されるシャードの追跡

賃貸借契約テーブルとは

Amazon Kinesis Data Streams アプリケーションごとに、KCL は一意のリーステーブル(Amazon DynamoDB テーブルに格納されている)を使用して、KCL コンシューマーアプリケーションのワーカーによってリースおよび処理される KDS データストリームのシャードを追跡します。

重要

KCL は、コンシューマーアプリケーションの名前を使用して、このコンシューマーアプリケーションが使用するリーステーブル名を作成します。そのため、各コンシューマーアプリケーション名は一意である必要があります。

リーステーブルを表示するには、Amazon DynamoDB コンソールコンシューマーアプリケーションの実行中に。

アプリケーションの起動時に KCL コンシューマーアプリケーションのリーステーブルが存在しない場合は、いずれかのワーカーがこのアプリケーションのリーステーブルを作成します。

重要

アカウントには、Kinesis Data Streams 自体に関連するコストに加えて、DynamoDB テーブルに関連するコストが発生します。

リーステーブルの各行は、コンシューマーアプリケーションのワーカーで処理中のシャードを表します。KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理する場合、leaseKeyこれはシャード ID です。これはリーステーブルのハッシュキーです。以下の場合Java コンシューマーアプリケーション用に同じ KCL 2.x を使用した複数のデータストリームの処理の場合、LeaseKey の構造は次のようになります。account-id:StreamName:streamCreationTimestamp:ShardId。 例:111111111:multiStreamTest-1:12345:shardId-000000000336

シャード ID に加えて、各行には次のデータが含まれます。

  • チェックポイント: シャードの最新チェックポイントのシーケンス番号。この値は、データストリームのすべてのシャードで一意です。

  • checkpointSubSequenceNumber: Kinesis 製作者ライブラリの集約機能を使用する場合、これはチェックポイントで、Kinesis レコード内の個々のユーザーレコードを追跡します。

  • leaseCounter: ワーカーのリースが他のワーカーに保持されていることをワーカーが検出できるように、リースのバージョニングに使用されます。

  • leaseKey: リースの固有識別子。各リースはデータストリームのシャードに固有のもので、一度に 1 つのワーカーで保持されます。

  • leaseOwner: このリースを保持しているワーカー。

  • ownerSwitchesSinceCheckpoint: 最後にチェックポイントが書き込まれてから、このリースのワーカーが何回変更されたかを示します。

  • parentShardId: 子シャードの処理を開始する前に、親シャードが完全に処理済みであることを確認するために使用します。これにより、レコードがストリームに入力されたのと同じ順序で処理されるようになります。

  • ハッシュ範囲: で使用されるPeriodicShardSyncManagerを使用して定期同期を実行し、リーステーブルで不足しているシャードを見つけ、必要に応じてリースを作成します。

    注記

    このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリーステーブルに存在します。の詳細PeriodicShardSyncManagerとリースとシャード間の定期的な同期については、リーステーブルと KDS データストリームのシャードとの同期方法

  • チャイルドシャード: で使用されるLeaseCleanupManagerをクリックして、子シャードの処理ステータスを確認し、親シャードをリーステーブルから削除できるかどうかを判断します。

    注記

    このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリーステーブルに存在します。

  • ShardId: シャードの ID。

    注記

    このデータは、リーステーブルに存在するのは、Java コンシューマーアプリケーション用に同じ KCL 2.x を使用した複数のデータストリームの処理。これは、Java 用 KCL 2.x でのみサポートされ、Java 用 KCL 2.3 以降ではサポートされます。

  • ストリーム名次の形式のデータストリームの識別子。account-id:StreamName:streamCreationTimestamp

    注記

    このデータは、リーステーブルに存在するのは、Java コンシューマーアプリケーション用に同じ KCL 2.x を使用した複数のデータストリームの処理。これは、Java 用 KCL 2.x でのみサポートされ、Java 用 KCL 2.3 以降ではサポートされます。

Throughput

Amazon Kinesis Data Streams アプリケーションがプロビジョニングされたスループットの例外を受け取った場合は、DynamoDB テーブルのプロビジョニングされたスループットを増やす必要があります。KCL は、プロビジョニングされたスループットは、1 秒あたりの読み込み 10 回、1 秒あたりの書き込み 10 回ですが、これがユーザーのアプリケーションで十分でない場合があります。たとえば、Amazon Kinesis Data Streams アプリケーションが頻繁にチェックポイントを作成する場合や、多くのシャードで構成されるストリームを処理する場合は、より多くのスループットが必要になる場合があります。

DynamoDB でプロビジョニングされたスループットの詳細については、読み込み/書き込みキャパシティーモードおよびテーブルとデータの操作()Amazon DynamoDB 開発者ガイド

リーステーブルと KDS データストリームのシャードとの同期方法

KCL コンシューマーアプリケーションのワーカーは、リースを使用して、特定のデータストリームからシャードを処理します。任意の時点でどのワーカーがどのシャードをリースしているかに関する情報は、リーステーブルに格納されます。リーステーブルは、KCL コンシューマーアプリケーションの実行中に、データストリームの最新のシャード情報と同期したままにする必要があります。KCL は、コンシューマーアプリケーションのブートストラップ (コンシューマーアプリケーションの初期化時または再起動時)、および処理中のシャードが終了するまで (リシャーディング)、Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを同期します。つまり、ワーカーまたは KCL コンシューマーアプリケーションは、最初のコンシューマーアプリケーションのブートストラップ中およびコンシューマーアプリケーションがデータストリームの再シャーディングイベントに遭遇するたびに処理しているデータストリームと同期されます。

KCL 1.0-1.13 および KCL 2.0-2.2 での同期

KCL 1.0-1.13 および KCL 2.0-2.2 では、コンシューマーアプリケーションのブートストラップ中、および各データストリームのリシャードイベント中に、KCL は Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを、ListShardsまたはDescribeStream検出 API. 上記のすべての KCL バージョンで、KCL コンシューマーアプリケーションの各ワーカーは、コンシューマーアプリケーションのブートストラップ中および各ストリームのリシャードイベントでリース/シャードの同期プロセスを実行するために、次の手順を完了します。

  • 処理中のストリームのデータのすべてのシャードを取得する

  • リーステーブルからすべてのシャードリースを取得します

  • リーステーブルにリースを持たないオープンシャードをフィルタリングします

  • 見つかったすべての開いているシャードと、開いている親を持たない開いているシャードごとに反復処理します。

    • 階層ツリーを祖先パスを通過し、シャードが子孫であるかどうかを判断します。祖先シャードが処理されている場合(祖先シャードのリースエントリがリーステーブルに存在する場合)、または祖先シャードを処理する必要がある場合(たとえば、初期位置がTRIM_HORIZONまたはAT_TIMESTAMP)

    • コンテキスト内で開いているシャードが子孫である場合、KCL は初期位置に基づいてシャードをチェックポイントし、必要に応じて親のリースを作成します。

KCL 2.x での同期、KCL 2.3 以降以降

サポートされている最新バージョンの KCL 2.x (KCL 2.3) 以降から、ライブラリは同期プロセスに対する次の変更をサポートするようになりました。これらのリース/シャード同期の変更により、KCL コンシューマーアプリケーションが Kinesis Data Streams サービスに対して行う API 呼び出しの数が大幅に削減され、KCL コンシューマーアプリケーションのリース管理が最適化されます。

  • アプリケーションのブートストラップ中に、リーステーブルが空の場合、KCL はListShardAPI のフィルタリングオプション (ShardFilterオプションの request パラメーター) を使用して、指定した時刻に開かれているシャードのスナップショットのリースを取得および作成するには、ShardFilterパラメータ。-ShardFilterパラメーターを使用すると、ListShardsAPI. の唯一の必須プロパティShardFilterパラメータがType。KCL はTypeフィルタープロパティとその有効値の次の値を使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。

    • AT_TRIM_HORIZON-応答には、で開いていたすべてのシャードが含まれます。TRIM_HORIZON

    • AT_LATEST-応答には、データストリームの現在開いているシャードのみが含まれます。

    • AT_TIMESTAMP-応答には、開始タイムスタンプが指定されたタイムスタンプ以下で、終了タイムスタンプが指定されたタイムスタンプ以上であるか、またはまだ開いているすべてのシャードが含まれます。

    ShardFilterで指定されたシャードのスナップショットのリースを初期化するために、空のリーステーブルのリースを作成するときに使用されます。RetrievalConfig#initialPositionInStreamExtended

    ShardFilter の詳細については、「https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html」を参照してください。

  • リース/シャードの同期を実行するすべてのワーカーが、データストリーム内の最新のシャードでリーステーブルを最新の状態に保つ代わりに、1 人の選出されたワーカーリーダーがリース/シャードの同期を実行します。

  • KCL 2.3 はChildShardsの戻りパラメーターGetRecordsSubscribeToShardで発生するリース/シャードの同期を実行する APISHARD_ENDに設定され、KCL ワーカーは、処理が終了したシャードの子シャードのリースの作成のみを許可します。コンシューマーアプリケーション全体で共有する場合、リース/シャード同期のこの最適化では、ChildShardsのパラメータGetRecordsAPI. 専用スループット(拡張ファンアウト)コンシューマアプリケーションの場合、リース/シャード同期のこの最適化では、ChildShardsのパラメータSubscribeToShardAPI. 詳細については、「」を参照してください。GetRecords,SubscribeToShards, およびチャイルドシャード

  • 上記の変更により、KCL の動作は、既存のすべてのシャードについて学習するすべてのワーカーのモデルから、各ワーカーが所有するシャードの子シャードについてのみ学習するワーカーのモデルに移行しています。したがって、コンシューマーアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、KCL はリーステーブル内の潜在的なホールを特定するため (つまり、すべての新しいシャードについて学習するため)、追加の定期シャード/リーススキャンも実行され、完全なハッシュ範囲が処理され、必要に応じてリースを作成します。PeriodicShardSyncManagerは、定期的なリース/シャードスキャンの実行を担当するコンポーネントです。

    の詳細PeriodicShardSyncManagerKCL 2.3 のhttps://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213

    KCL 2.3 では、新しい設定オプションを使用してPeriodicShardSyncManagerLeaseManagementConfig:

    名前 デフォルト値 説明
    リース回復オーディトリアクスキュレーション周波数ミリス

    120000(2分)

    リース・テーブル内の部分リースをスキャンする監査ジョブの頻度(ミリ単位)。監査がストリームのリースにホールを検出すると、leasesRecoveryAuditorInconsistencyConfidenceThreshold

    リース回復監査一貫性コンフィデンススレッショルド

    3

    定期監査ジョブの信頼度しきい値は、リーステーブル内のデータストリームのリースが矛盾しているかどうかを判断します。監査が、データストリームに対して同じ一連の矛盾を何度も連続して検出すると、シャード同期がトリガーされます。

    新しい CloudWatch メトリックスも発行され、PeriodicShardSyncManager。詳細については、「PeriodicShardSyncManager」を参照してください。

  • 最適化を含めるHierarchicalShardSyncerを使用して、シャードの 1 つのレイヤーに対してのみリースを作成します。

KCL 1.x での同期、KCL 1.14 以降以降

サポートされている最新バージョンの KCL 1.x (KCL 1.14) 以降から、ライブラリは同期プロセスに対する次の変更をサポートするようになりました。これらのリース/シャード同期の変更により、KCL コンシューマーアプリケーションが Kinesis Data Streams サービスに対して行う API 呼び出しの数が大幅に削減され、KCL コンシューマーアプリケーションのリース管理が最適化されます。

  • アプリケーションのブートストラップ中に、リーステーブルが空の場合、KCL はListShardAPI のフィルタリングオプション (ShardFilterオプションの request パラメーター) を使用して、指定した時刻に開かれているシャードのスナップショットのリースを取得および作成するには、ShardFilterパラメータ。-ShardFilterパラメーターを使用すると、ListShardsAPI. の唯一の必須プロパティShardFilterパラメータがType。KCL はTypeフィルタープロパティとその有効値の次の値を使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。

    • AT_TRIM_HORIZON-応答には、で開いていたすべてのシャードが含まれます。TRIM_HORIZON

    • AT_LATEST-応答には、データストリームの現在開いているシャードのみが含まれます。

    • AT_TIMESTAMP-応答には、開始タイムスタンプが指定されたタイムスタンプ以下で、終了タイムスタンプが指定されたタイムスタンプ以上であるか、またはまだ開いているすべてのシャードが含まれます。

    ShardFilterで指定されたシャードのスナップショットのリースを初期化するために、空のリーステーブルのリースを作成するときに使用されます。KinesisClientLibConfiguration#initialPositionInStreamExtended

    ShardFilter の詳細については、「https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html」を参照してください。

  • リース/シャードの同期を実行するすべてのワーカーが、データストリーム内の最新のシャードでリーステーブルを最新の状態に保つ代わりに、1 人の選出されたワーカーリーダーがリース/シャードの同期を実行します。

  • KCL 1.14 はChildShardsの戻りパラメーターGetRecordsSubscribeToShardで発生するリース/シャードの同期を実行する APISHARD_ENDに設定され、KCL ワーカーは、処理が終了したシャードの子シャードのリースの作成のみを許可します。詳細については、「」を参照してください。GetRecordsおよびチャイルドシャード

  • 上記の変更により、KCL の動作は、既存のすべてのシャードについて学習するすべてのワーカーのモデルから、各ワーカーが所有するシャードの子シャードについてのみ学習するワーカーのモデルに移行しています。したがって、コンシューマーアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、KCL はリーステーブル内の潜在的なホールを特定するため (つまり、すべての新しいシャードについて学習するため)、追加の定期シャード/リーススキャンも実行され、完全なハッシュ範囲が処理され、必要に応じてリースを作成します。PeriodicShardSyncManagerは、定期的なリース/シャードスキャンの実行を担当するコンポーネントです。

    メトリックKinesisClientLibConfiguration#shardSyncStrategyTypeは、 に設定されます。ShardSyncStrategyType.SHARD_END,PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThresholdは、リーステーブル内のホールを含む連続したスキャンの数のしきい値を決定するために使用されます。その後、シャード同期を強制します。メトリックKinesisClientLibConfiguration#shardSyncStrategyTypeは、 に設定されます。ShardSyncStrategyType.PERIODIC,leasesRecoveryAuditorInconsistencyConfidenceThresholdは無視されます。

    の詳細PeriodicShardSyncManagerKCL 1.14 では、https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999

    KCL 1.14 では、新しい設定オプションを使用してPeriodicShardSyncManagerLeaseManagementConfig:

    名前 デフォルト値 説明
    リース回復監査一貫性コンフィデンススレッショルド

    3

    定期監査ジョブの信頼度しきい値は、リーステーブル内のデータストリームのリースが矛盾しているかどうかを判断します。監査が、データストリームに対して同じ一連の矛盾を何度も連続して検出すると、シャード同期がトリガーされます。

    新しい CloudWatch メトリックスも発行され、PeriodicShardSyncManager。詳細については、「PeriodicShardSyncManager」を参照してください。

  • KCL 1.14 では、遅延リースのクリーンアップもサポートされるようになりました。リースはLeaseCleanupManagerに達するとSHARD_ENDは、シャードがデータストリームの保持期間を超えて期限切れになったか、リシャーディング操作の結果として閉じられた場合に発生します。

    新しい設定オプションを使用でき、LeaseCleanupManager

    名前 デフォルト値 説明
    リース・クリーナアップ・インターバルミリス

    1 分

    リースクリーンアップスレッドを実行する間隔。

    コンプリートリーアセクル・インターバルミリス 5 分

    リースが完了したかどうかをチェックする間隔。

    GarbageLeasecleanupIntervalMillis 30 分

    リースがガベージであるかどうかをチェックする間隔(つまり、データストリームの保持期間を超えてトリミングされる)。

  • 最適化を含めるKinesisShardSyncerを使用して、シャードの 1 つのレイヤーに対してのみリースを作成します。

Java コンシューマーアプリケーション用に同じ KCL 2.x を使用した複数のデータストリームの処理

このセクションでは、KCL 2.x for Java での次の変更点について説明します。これにより、複数のデータストリームを同時に処理できる KCL コンシューマアプリケーションを作成できます。

重要

マルチストリーム処理は、KCL 2.x for Java でのみサポートされ、Java for KCL 2.3 以降ではサポートされます。

マルチストリーム処理は、KCL 2.x を実装できる他の言語ではサポートされていません。

マルチストリーム処理は、KCL 1.x のどのバージョンでもサポートされていません。

  • マルチストリームトラッカーインターフェイス

    複数のストリームを同時に処理できるコンシューマーアプリケーションを構築するには、マルチストリームトラッカー。このインターフェイスには、streamConfigListメソッドを呼び出します。このメソッドは、KCL コンシューマーアプリケーションによって処理されるデータストリームとその構成のリストを返します。処理中のデータ・ストリームは、コンシューマ・アプリケーションの実行時に変更できることに注意してください。streamConfigListは、KCL によって定期的に呼び出され、処理するデータストリームの変更について学習します。

    -streamConfigListメソッドが入力されるストリームコンフィグリスト。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    以下の点に注意してください。StreamIdentifierおよびInitialPositionInStreamExtendedは必須フィールドですが、consumerArnは省略可能です。以下を指定する必要がありますconsumerArnは、KCL 2.x を使用して拡張ファンアウトコンシューマーアプリケーションを実装する場合に限ります。

    の詳細StreamIdentifier「」を参照してください。https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java#L29。マルチストリームインスタンスを作成するには、StreamIdentifierをシリアル化されたストリーム識別子から返します。シリアライズされたストリーム識別子の形式は、次のとおりです。account-id:StreamName:streamCreationTimestamp

    * @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }

    MultistreamTrackerには、リーステーブル内の古いストリームのリースを削除する方法も含まれています (formerStreamsLeasesDeletionStrategy). コンシューマーアプリケーションの実行時に戦略を変更できないことに注意してください。詳細については、「」を参照してください。https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy.java

  • ConfigsBuilderは、KCL コンシューマーアプリケーションを構築するときに使用する KCL 2.x 構成設定のすべてを指定するために使用できるアプリケーション全体のクラスです。ConfigsBuilderクラスのサポートが追加されましたMultistreamTrackerインターフェイスからリクエスト. ConfigsBuilder を初期化するには、以下のいずれかのデータストリームの名前を使用します。

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    または、ConfigsBuilderを初期化することもできますMultiStreamTracker複数のストリームを同時に処理する KCL コンシューマーアプリケーションを実装する場合。

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • KCL コンシューマーアプリケーション用に実装されたマルチストリームサポートにより、アプリケーションのリーステーブルの各行に、このアプリケーションが処理する複数のデータストリームのシャード ID とストリーム名が含まれるようになりました。

  • KCL コンシューマーアプリケーションのマルチストリームサポートが実装されると、LeaseKey は次の構造になります。account-id:StreamName:streamCreationTimestamp:ShardId。 例:111111111:multiStreamTest-1:12345:shardId-000000000336

    重要

    既存の KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理するように設定されている場合、LeaseKey (リーステーブルのハッシュキー) はシャード ID です。複数のデータストリームを処理するようにこの既存の KCL コンシューマーアプリケーションを再構成すると、リーステーブルが破損します。マルチストリームのサポートでは、LeaseKey 構造は次のようにする必要があります。account-id:StreamName:StreamCreationTimestamp:ShardId

Kinesis クライアントライブラリの使用AWSGlue スキーマレジストリ

Kinesis データストリームはAWSGlue スキーマレジストリ -AWSGlue スキーマレジストリを使用すると、生成されたデータが登録されたスキーマによって継続的に検証されると同時に、スキーマの検出、制御、および進化を一元的に実行できます。スキーマは、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータパブリケーション、消費、またはストレージのためのバージョン対応仕様です。-AWSGlue Schema Registry を使用すると、ストリーミングアプリケーション内のエンドツーエンドのデータ品質とデータガバナンスを向上させることができます。詳細については、「」を参照してください。AWSGlue スキーマレジストリ。この統合をセットアップする方法の 1 つは、Java で KCL を使用する方法です。

重要

現在、Kinesis Data Streams およびAWSGlue スキーマのレジストリ統合は、Java で実装された KCL 2.3 コンシューマーを使用する Kinesis データストリームでのみサポートされます。多言語サポートは提供されていません。KCL 1.0 コンシューマはサポートされていません。KCL 2.3 より前の KCL 2.x コンシューマはサポートされていません。

KCL を使用して KKinesis Data Streams とスキーマレジストリの統合を設定する方法の詳細については、」ユースケース: Amazon Kinesis Data Streams とAWSGlue スキーマレジストリ