翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Kinesis クライアントライブラリの使用
KDS データストリームからデータを処理できるカスタムコンシューマアプリケーションを開発する方法の 1 つは、Kinesis クライアントライブラリ (KCL) を使用することです。
トピック
注記
KCL 1.x と KCL 2.x の両方で、使用シナリオに応じて、最新の KCL 1.x バージョンまたは KCL 2.x バージョンにアップグレードすることをお勧めします。KCL 1.x と KCL 2.x は、最新の依存関係とセキュリティパッチ、バグ修正、下位互換性のある新機能を含む新しいリリースで定期的に更新されます。詳細については、「」を参照してください。https://github.com/awslabs/amazon-kinesis-client/リリース
Kinesis クライアントライブラリとは何ですか?
KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesis データストリームからデータを消費および処理するのに役立ちます。これには、複数のコンシューマーアプリケーションインスタンス間での負荷分散、コンシューマーアプリケーションインスタンスの障害に対する応答、処理済みのレコードのチェックポイント作成、リシャーディングへの対応が挙げられます。KCL はこれらのサブタスクをすべて処理するため、カスタムレコード処理ロジックの記述に集中できます。
KCL は、で利用可能な Kinesis Data Streams API とは異なります。AWSSDK。Kinesis Data Streams API は、ストリームの作成、リシャーディング、レコードの入力と取得など、Kinesis Data Streams のさまざまな要素を管理するのに役立ちます。KCL は、これらすべてのサブタスクの抽象化レイヤーを提供します。具体的には、コンシューマーアプリケーションのカスタムデータ処理ロジックに集中できます。Kinesis Data Streams API の詳細については、Amazon Kinesis API リファレンス。
重要
KCL は Java ライブラリです。Java以外の言語Support は、MultiLangデーモン。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。たとえば、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangデーモン。さらに、MultiLangデーモンには、ユースケースに合わせてカスタマイズする必要のあるデフォルト設定例があります。AWS接続先のリージョン。の詳細については、「」を参照してください。MultiLangデーモンオンGitHub「」を参照してください。KCLMultiLangデーモンプロジェクト
KCL は、レコード処理ロジックと Kinesis Data Streams との仲介として機能します。KCL は次のタスクを実行します。
-
データストリームに接続します。
-
データストリーム内のシャードを列挙します。
-
リースを使用して、ワーカーとのシャードの関連付けを調整する
-
レコードプロセッサで管理する各シャードのレコードプロセッサをインスタンス化する
-
データストリームからデータレコードを取得します。
-
対応するレコードプロセッサにレコードを送信する
-
処理されたレコードのチェックポイントを作成する
-
ワーカーインスタンス数が変更されたとき、またはデータストリームがリシャード(シャードが分割またはマージされる)ときに、シャードワーカーの関連付け(リース)のバランスをとります。
KCL 使用可能なバージョン
現在、次のいずれかのサポートされている KCL バージョンを使用して、カスタムコンシューマーアプリケーションを構築できます。
-
KCL 1.x
詳細については、「」を参照してください。KCL 1.x コンシューマーの開発
-
KCL 2.x
詳細については、「」を参照してください。KCL 2.x コンシューマーの開発
KCL 1.x または KCL 2.x のいずれかを使用して、共有スループットを使用するコンシューマアプリケーションを構築できます。詳細については、「」を参照してくださいKCL を使用したスループット共有カスタムコンシューマーの開発
専用スループット(拡張ファンアウトコンシューマー)を使用するコンシューマアプリケーションを構築するには、KCL 2.x のみを使用できます。詳細については、「」を参照してくださいスループット専有 (拡張ファンアウト) カスタムコンシューマーの開発
KCL 1.x と KCL 2.x の違い、および KCL 1.x から KCL 2.x に移行する方法については、を参照してください。コンシューマーを KCL 1.x から KCL 2.x に移行する。
KCL の概念
-
KCL コンシューマーアプリケーション— KCL を使用してカスタムビルドされ、データストリームからレコードを取得して処理するように設計されたアプリケーション。
-
コンシューマアプリケーションインスタンス-KCL コンシューマアプリケーションは、通常、障害時の調整とデータレコード処理の負荷分散のために、1 つ以上のアプリケーションインスタンスが同時に実行され、分散されます。
-
ワーカー— KCL コンシューマーアプリケーションインスタンスがデータの処理を開始するために使用する高レベルクラス。
重要
各 KCL コンシューマーアプリケーションインスタンスには 1 つのワーカーがあります。
ワーカーは、シャードとリース情報の同期、シャード割り当ての追跡、シャードからのデータの処理など、さまざまなタスクを初期化し、監督します。ワーカーは KCL に、この KCL コンシューマアプリケーションが処理しようとしているデータを記録するデータストリームの名前など、コンシューマアプリケーションの設定情報を KCL に提供します。AWSこのデータストリームにアクセスするために必要な資格情報。ワーカーは、その特定の KCL コンシューマアプリケーションインスタンスを開始して、データストリームからレコードプロセッサにデータレコードを配信します。
重要
KCL 1.x では、このクラスが呼び出されます。ワーカー。詳細については、(Java KCL リポジトリです)、を参照してください。https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/Amazonaws/services/kinesis/clientLibrary/lib/worker/worker.java
。KCL 2.x では、このクラスはスケジューラー。KCL 2.x のスケジューラの目的は、KCL 1.x のワーカーの目的と同じです。KCL 2.x のスケジューラクラスの詳細については、を参照してください。https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java 。 -
リースリース— ワーカーとシャード間のバインディングを定義するデータ。分散型 KCL コンシューマアプリケーションは、リースを使用して、複数のワーカー間でデータレコード処理を分割します。任意の時点で、データレコードの各シャードは、LeaseKey変数.
デフォルトでは、作業者は 1 つ以上のリースを保持できます(maxLeasesForワーカー変数) を同時に実行できます。
重要
すべてのワーカーは、データストリーム内の利用可能なすべてのシャードについて、利用可能なすべてのリースを保持すると競合します。しかし、一度に各リースを正常に保持できるのは1人のワーカーだけです。
たとえば、4 つのシャードを持つデータストリームを処理しているワーカー A を持つコンシューマアプリケーションインスタンス A がある場合、ワーカー A はシャード 1、2、3、および 4 へのリースを同時に保持できます。ただし、コンシューマアプリケーションインスタンスが 2 つある場合は、次のようになります。A と B はワーカー A とワーカー B で、これらのインスタンスは 4 つのシャードを持つデータストリームを処理しています。ワーカー A とワーカー B はシャード 1 へのリースを同時に保持できません。あるワーカーは、このシャードのデータレコードの処理を停止する準備ができるまで、または失敗するまで、特定のシャードへのリースを保持します。ある作業者がリースの保留を停止すると、別の作業者がリースを引き取り、保留します。
詳細については、(Java KCL リポジトリです)、を参照してください。https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/lease.java
KCL 1.x およびhttps://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java KCL 2.x の場合. -
リーステーブル-KCL コンシューマーアプリケーションのワーカーによってリースおよび処理されている KDS データストリーム内のシャードを追跡するために使用される一意の Amazon DynamoDB テーブル。リーステーブルは、KCL コンシューマアプリケーションの実行中に、データストリームからの最新のシャード情報と(ワーカー内およびすべてのワーカー間で)同期を維持する必要があります。詳細については、「」を参照してくださいリーステーブルを使用して KCL コンシューマアプリケーションによって処理されたシャードを追跡する
-
レコードプロセッサ— KCL コンシューマーアプリケーションがデータストリームから取得したデータを処理する方法を定義するロジック。実行時に、KCL コンシューマアプリケーションインスタンスがワーカーをインスタンス化し、このワーカーは、リースを保持するシャードごとに 1 つのレコードプロセッサをインスタンス化します。
リーステーブルを使用して KCL コンシューマアプリケーションによって処理されたシャードを追跡する
リーステーブルとは何ですか
Amazon Kinesis Data Streams アプリケーションごとに、KCL は一意のリーステーブル(Amazon DynamoDB テーブルに格納されている)を使用して、KCL コンシューマーアプリケーションのワーカーによってリースおよび処理される KDS データストリーム内のシャードを追跡します。
重要
KCL は、コンシューマーアプリケーションの名前を使用して、このコンシューマーアプリケーションが使用するリーステーブル名を作成します。そのため、各コンシューマーアプリケーション名は一意である必要があります。
リーステーブルは、Amazon DynamoDB コンソールコンシューマアプリケーションの実行中。
KCL コンシューマーアプリケーションのリーステーブルがアプリケーションの起動時に存在しない場合は、いずれかのワーカーがこのアプリケーションのリーステーブルを作成します。
重要
アカウントには、Kinesis Data Streams 自体に関連するコストに加えて、DynamoDB テーブルに関連するコストが発生します。
リーステーブルの各行は、コンシューマーアプリケーションのワーカーによって処理中のシャードを表します。KCL コンシューマアプリケーションが 1 つのデータストリームのみを処理する場合は、leaseKey
シャード ID は、リーステーブルのハッシュキーです。例えばJava コンシューマアプリケーションの同じ KCL 2.x で複数のデータストリームを処理するとすると、LeaseKey の構造は次のようになります。account-id:StreamName:streamCreationTimestamp:ShardId
。 たとえば、111111111:multiStreamTest-1:12345:shardId-000000000336
。
シャード ID に加えて、各行には次のデータが含まれます。
-
チェックポイント: シャードの最新チェックポイントのシーケンス番号。この値はデータストリームのすべてのシャードで一意です。
-
checkpointSubSequence: 番号: Kinesis プロデューサーライブラリの集約機能を使用する場合、これはチェックポイントKinesis レコード内の個々のユーザレコードを追跡します。
-
leaseCounter: ワーカーのリースが他のワーカーに保持されていることをワーカーが検出できるように、リースのバージョニングに使用されます。
-
LeaseKey: リースの一意の識別子。各リースはデータストリームのシャードに固有のもので、一度に 1 つのワーカーで保持されます。
-
leaseOwner: このリースを保持しているワーカー。
-
ownerSwitchesSinceチェックポイント: 最後にチェックポイントが書き込まれてから、このリースのワーカーが何回変更されたかを示します。
-
parentShardId: 子シャードの処理を開始する前に、親シャードが完全に処理済みであることを確認するために使用します。これにより、レコードがストリームに入力されたのと同じ順序で処理されるようになります。
-
ハッシュレンジ: で使われる
PeriodicShardSyncManager
定期的に同期を実行して、リーステーブルで欠落しているシャードを見つけ、必要に応じてリースを作成します。注記
このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリーステーブルに存在します。の詳細
PeriodicShardSyncManager
リースとシャード間の定期的な同期については、を参照してください。KDS データストリームのシャードとリーステーブルの同期方法。 -
チャイルドシャード: で使われる
LeaseCleanupManager
をクリックし、子シャードの処理ステータスを確認し、親シャードをリーステーブルから削除できるかどうかを決定します。注記
このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリーステーブルに存在します。
-
ShardId: シャードの ID。
注記
このデータは、リーステーブルに該当する場合のみ存在します。Java コンシューマアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する。これは Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。
-
ストリーム名次の形式のデータストリームの識別子。
account-id:StreamName:streamCreationTimestamp
。注記
このデータは、リーステーブルに該当する場合のみ存在します。Java コンシューマアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する。これは Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。
スループット
Amazon Kinesis Data Streams アプリケーションでプロビジョニングされたスループットの例外が発生した場合は、DynamoDB テーブルのプロビジョニングされたスループットを増やす必要があります。KCL は、プロビジョニングされたスループットを 1 秒あたりの読み込み 10 回、1 秒あたりの書き込み 10 回、1 秒あたりの書き込み 10 回ですが、これはアプリケーションでは十分でない場合があります。たとえば、Amazon Kinesis Data Streams アプリケーションで、頻繁にチェックポイントを作成する場合や、多くのシャードで構成されるストリームを処理する場合は、より多くのスループットが必要になる可能性があります。
DynamoDB のプロビジョニングされたスループットについては、を参照してください。読み込み/書き込みキャパシティーモードそしてテーブルとデータの操作のAmazon DynamoDB 開発者ガイド。
KDS データストリームのシャードとリーステーブルの同期方法
KCL コンシューマーアプリケーションのワーカーは、リースを使用して特定のデータストリームからシャードを処理します。特定の時点でどのシャードをリースしているワーカーに関する情報は、リーステーブルに格納されます。KCL コンシューマアプリケーションの実行中は、リーステーブルがデータストリームからの最新のシャード情報と同期している必要があります。KCL は、コンシューマアプリケーションのブートストラップ(コンシューマアプリケーションの初期化時または再起動時)、および処理中のシャードが終了(リシャーディング)に達するたびに、Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを同期します。つまり、ワーカーまたは KCL コンシューマアプリケーションは、最初のコンシューマアプリケーションのブートストラップ中、およびコンシューマアプリケーションでデータストリームリシャードイベントが発生するたびに、処理中のデータストリームと同期されます。
KCL 1.0-1.13 と KCL 2.0-2.2 での同期
KCL 1.0-1.13 および KCL 2.0-2.2 では、コンシューマアプリケーションのブートストラップ、および各データストリームのリシャードイベント中に、KCL は、Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを同期します。ListShards
またはDescribeStream
検出 API。上記のすべての KCL バージョンで、KCL コンシューマーアプリケーションの各ワーカーは、コンシューマアプリケーションのブートストラップ中および各ストリームリシャードイベントでリース/シャード同期プロセスを実行するために、次の手順を完了します。
-
処理中のストリームのデータのすべてのシャードをフェッチします
-
リーステーブルからすべてのシャードリースをフェッチします。
-
リーステーブルにリースのないオープンシャードをフィルタで除外します。
-
見つかったすべてのオープンシャードと、開いている親を持たない各オープンシャードについて反復処理します。
-
階層ツリーをその祖先パスを通過して、シャードが子孫であるかどうかを判断します。祖先シャードが処理されている場合 (リーステーブルに祖先シャードのリースエントリが存在する場合)、または祖先シャードを処理する必要がある場合 (たとえば、初期位置が
TRIM_HORIZON
またはAT_TIMESTAMP
) -
コンテキスト内のオープンシャードが子孫である場合、KCL は初期位置に基づいてシャードをチェックポイントし、必要に応じて親のリースを作成します。
-
KCL 2.x での同期。KCL 2.3 以降で始まる
サポートされている最新バージョンの KCL 2.x (KCL 2.3) 以降では、ライブラリで同期プロセスに対する次の変更がサポートされるようになりました。これらのリース/シャード同期の変更により、KCL コンシューマーアプリケーションから Kinesis Data Streams サービスに対して実行される API 呼び出しの数が大幅に削減され、KCL コンシューマアプリケーションのリース管理が最適化されます。
-
アプリケーションのブートストラップ中に、リーステーブルが空の場合、KCL は
ListShard
API のフィルタリングオプション (ShardFilter
オプションのリクエストパラメータ)。指定した時刻にオープンしているシャードのスナップショットについてのみリースを取得して作成します。ShardFilter
パラメータ。-ShardFilter
パラメータを使用すると、ListShards
アピ。の唯一の必須プロパティShardFilter
パラメータは次のとおりです。Type
。KCL はType
filter プロパティとその次の有効な値を使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。-
AT_TRIM_HORIZON
-応答には、で開いていたすべてのシャードが含まれますTRIM_HORIZON
。 -
AT_LATEST
-応答には、データストリームの現在開いているシャードのみが含まれます。 -
AT_TIMESTAMP
-応答には、開始タイムスタンプが指定されたタイムスタンプより後または同じ、かつ終了タイムスタンプが指定されたタイムスタンプより後または同じ、または同じ、または同じ、または同じであるすべてのシャードが含まれます。
ShardFilter
空のリーステーブルのリースを作成して、で指定したシャードのスナップショットのリースを初期化するときに使用されます。RetrievalConfig#initialPositionInStreamExtended
。ShardFilter
の詳細については、「https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html」を参照してください。 -
-
すべてのワーカーがリース/シャード同期を実行して、データストリーム内の最新のシャードでリーステーブルを最新の状態に保つ代わりに、選択された単一のワーカーリーダーがリース/シャードの同期を実行します。
-
KCL 2.3 は
ChildShards
のパラメータを返すGetRecords
とSubscribeToShard
で発生するリース/シャード同期を実行する APISHARD_END
クローズドシャードの場合、KCL ワーカーが処理を終了したシャードの子シャードに対してのみリースを作成できるようにします。コンシューマアプリケーション全体で共有する場合、リース/シャード同期のこの最適化では、ChildShards
のパラメータGetRecords
アピ。専用スループット(拡張ファンアウト)コンシューマアプリケーションでは、リース/シャード同期のこの最適化では、ChildShards
のパラメータSubscribeToShard
アピ。詳細については、「」を参照してください。GetRecords,SubscribeToシャード, およびChildShard。 -
上記の変更により、KCLの動作は、既存のすべてのシャードについて学習するすべての労働者のモデルから、各作業者が所有するシャードの子シャードについてのみ学習する労働者のモデルに移行しています。したがって、コンシューマアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、KCL は、リーステーブル内の潜在的なホールを特定するために、追加の定期的なシャード/リーススキャンを実行して(つまり、すべての新しいシャードについて学習する)、データストリームのハッシュ範囲が処理されており、必要に応じてリースを作成します。
PeriodicShardSyncManager
定期的なリース/シャードスキャンの実行を担当するコンポーネントです。の詳細
PeriodicShardSyncManager
KCL 2.3 では、「」を参照してください。https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/LeaseManagementConfig.java #L201-L213。 KCL 2.3 では、新しい設定オプションを設定できるようになりました。
PeriodicShardSyncManager
にLeaseManagementConfig
:[Name] (名前) デフォルト値 説明 leasesRecoveryAuditorExecutionFrequencyミリス 120000(2分)
リース・テーブルで部分的なリースをスキャンする監査ジョブの頻度 (ミリ単位)。オーディターがストリームのリースの穴を検出すると、次の基準に基づいてシャード同期がトリガーされます。
leasesRecoveryAuditorInconsistencyConfidenceThreshold
。leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
リーステーブル内のデータ・ストリームのリースが矛盾しているかどうかを判断するための、定期的な監査ジョブの信頼しきい値。監査者がデータ・ストリームに対して同じ不整合セットを何度も繰り返し検出すると、シャード同期がトリガーされます。
新規CloudWatchメトリックスも発行され、
PeriodicShardSyncManager
。詳細については、「」を参照してくださいPeriodicShardSyncManager -
への最適化を含める
HierarchicalShardSyncer
シャードの 1 つのレイヤーに対してのみリースを作成します。
KCL 1.x での同期。KCL 1.14 以降から始まる
サポートされている最新バージョンの KCL 1.x (KCL 1.14) 以降では、ライブラリで同期プロセスに対する次の変更がサポートされるようになりました。これらのリース/シャード同期の変更により、KCL コンシューマーアプリケーションから Kinesis Data Streams サービスに対して実行される API 呼び出しの数が大幅に削減され、KCL コンシューマアプリケーションのリース管理が最適化されます。
-
アプリケーションのブートストラップ中に、リーステーブルが空の場合、KCL は
ListShard
API のフィルタリングオプション (ShardFilter
オプションのリクエストパラメータ)。指定した時刻にオープンしているシャードのスナップショットについてのみリースを取得して作成します。ShardFilter
パラメータ。-ShardFilter
パラメータを使用すると、ListShards
アピ。の唯一の必須プロパティShardFilter
パラメータは次のとおりです。Type
。KCL はType
filter プロパティとその次の有効な値を使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。-
AT_TRIM_HORIZON
-応答には、で開いていたすべてのシャードが含まれますTRIM_HORIZON
。 -
AT_LATEST
-応答には、データストリームの現在開いているシャードのみが含まれます。 -
AT_TIMESTAMP
-応答には、開始タイムスタンプが指定されたタイムスタンプより後または同じ、かつ終了タイムスタンプが指定されたタイムスタンプより後または同じ、または同じ、または同じ、または同じであるすべてのシャードが含まれます。
ShardFilter
空のリーステーブルのリースを作成して、で指定したシャードのスナップショットのリースを初期化するときに使用されます。KinesisClientLibConfiguration#initialPositionInStreamExtended
。ShardFilter
の詳細については、「https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html」を参照してください。 -
-
すべてのワーカーがリース/シャード同期を実行して、データストリーム内の最新のシャードでリーステーブルを最新の状態に保つ代わりに、選択された単一のワーカーリーダーがリース/シャードの同期を実行します。
-
KCL 1.14 は
ChildShards
のパラメータを返すGetRecords
とSubscribeToShard
で発生するリース/シャード同期を実行する APISHARD_END
クローズドシャードの場合、KCL ワーカーが処理を終了したシャードの子シャードに対してのみリースを作成できるようにします。詳細については、「」を参照してください。GetRecordsそしてChildShard。 -
上記の変更により、KCLの動作は、既存のすべてのシャードについて学習するすべての労働者のモデルから、各作業者が所有するシャードの子シャードについてのみ学習する労働者のモデルに移行しています。したがって、コンシューマアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、KCL は、リーステーブル内の潜在的なホールを特定するために、追加の定期的なシャード/リーススキャンを実行して(つまり、すべての新しいシャードについて学習する)、データストリームのハッシュ範囲が処理されており、必要に応じてリースを作成します。
PeriodicShardSyncManager
定期的なリース/シャードスキャンの実行を担当するコンポーネントです。メトリック
KinesisClientLibConfiguration#shardSyncStrategyType
は、 に設定されます。ShardSyncStrategyType.SHARD_END
,PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold
は、シャード同期を強制するために、リーステーブル内のホールを含む連続スキャンの数のしきい値を決定するために使用されます。メトリックKinesisClientLibConfiguration#shardSyncStrategyType
は、 に設定されます。ShardSyncStrategyType.PERIODIC
,leasesRecoveryAuditorInconsistencyConfidenceThreshold
は無視されます。の詳細
PeriodicShardSyncManager
KCL 1.14では、https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientLibrary/lib/worker/KinesisClientLibConfiguration.java #L987-L999。 KCL 1.14 では、新しいコンフィギュレーションオプションを設定できるようになりました
PeriodicShardSyncManager
にLeaseManagementConfig
:[Name] (名前) デフォルト値 説明 leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
リーステーブル内のデータ・ストリームのリースが矛盾しているかどうかを判断するための、定期的な監査ジョブの信頼しきい値。監査者がデータ・ストリームに対して同じ不整合セットを何度も繰り返し検出すると、シャード同期がトリガーされます。
新規CloudWatchメトリックスも発行され、
PeriodicShardSyncManager
。詳細については、「」を参照してくださいPeriodicShardSyncManager -
KCL 1.14 は、遅延リースのクリーンアップもサポートするようになりました。リースは次の方法で非同期的に削除されます。
LeaseCleanupManager
到達時にSHARD_END
、シャードがデータストリームの保持期間を過ぎたか、リシャーディング操作の結果としてクローズされたとき。設定に新しい設定オプションを使用できるようになりました。
LeaseCleanupManager
。[Name] (名前) デフォルト値 説明 leaseCleanupIntervalミリス 1 分
リース・クリーンアップ・スレッドを実行する間隔。
completedLeaseCleanupIntervalMillis 5 分 リースが完了したかどうかをチェックする間隔。
garbageLeaseCleanupIntervalMillis 30 分 リースがガベージであるかどうかをチェックする間隔(つまり、データ・ストリームの保持期間を過ぎてトリミング)。
-
への最適化を含める
KinesisShardSyncer
シャードの 1 つのレイヤーに対してのみリースを作成します。
Java コンシューマアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する
このセクションでは、複数のデータストリームを同時に処理できる KCL コンシューマアプリケーションを作成できる KCL 2.x for Java での次の変更について説明します。
重要
マルチストリーム処理は KCL 2.x for Java でのみサポートされており、Java については KCL 2.3 以降。
KCL 2.x を実装できる他の言語では、マルチストリーム処理はサポートされていません。
マルチストリーム処理は KCL 1.x のどのバージョンでもサポートされていません。
-
MultistreamTracker インターフェイス
複数のストリームを同時に処理できるコンシューマアプリケーションを構築するには、という新しいインターフェイスを実装する必要があります。MultistreamTracker
。このインターフェイスには、 streamConfigList
KCL コンシューマアプリケーションで処理されるデータストリームとその設定のリストを返すメソッド。処理中のデータストリームは、コンシューマアプリケーションのランタイム中に変更できることに注意してください。streamConfigList
は、処理するデータストリームの変更について学習するために KCL によって定期的に呼び出されます。-
streamConfigList
メソッドの入力StreamConfigリスト. package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
以下の点に注意してください。
StreamIdentifier
そしてInitialPositionInStreamExtended
は必須フィールドですが、consumerArn
は省略可能です。以下の情報が必要です。consumerArn
KCL 2.x を使用して拡張ファンアウトコンシューマアプリケーションを実装している場合に限ります。の詳細
StreamIdentifier
「」を参照してください。https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/ソフトウェア/amazon/kinesis/StreamIdentifier.java #L29。のマルチストリームインスタンスを作成できます。 StreamIdentifier
シリアライズされたストリーム識別子から。シリアライズされたストリーム識別子は、次の形式にする必要があります。account-id:StreamName:streamCreationTimestamp
。* @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }
MultistreamTracker
リーステーブル内の古いストリームのリースを削除するための戦略も含まれます (formerStreamsLeasesDeletionStrategy
). コンシューマアプリケーションのランタイム中は、ストラテジーを変更できないことに注意してください。詳細については、「」を参照してください。https://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/ソフトウェア/amazon/kinesis/FormerStreamsLeasesDeletionStrategy.java -
ConfigsBuilder
は、アプリケーション全体のクラスで、KCL コンシューマアプリケーションの構築時に使用する KCL 2.x の構成設定をすべて指定するために使用できます。 ConfigsBuilder
クラスは現在、MultistreamTracker
インターフェイスからリクエスト. 初期化できますConfigsBuilderレコードを消費する 1 つのデータストリームの名前を指定します。/** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
または、初期化することもできますConfigsBuilderと
MultiStreamTracker
複数のストリームを同時に処理する KCL コンシューマアプリケーションを実装する場合。* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
KCL コンシューマアプリケーションにマルチストリームサポートが実装されているため、アプリケーションのリーステーブルの各行に、このアプリケーションが処理する複数のデータストリームのシャード ID とストリーム名が含まれます。
-
KCL コンシューマアプリケーションのマルチストリームサポートが実装されている場合、LeaseKey は次の構造を取ります。
account-id:StreamName:streamCreationTimestamp:ShardId
。 たとえば、111111111:multiStreamTest-1:12345:shardId-000000000336
。重要
既存の KCL コンシューマアプリケーションが 1 つのデータストリームのみを処理するように設定されている場合、LeaseKey(リーステーブルのハッシュキー)はシャード ID になります。この既存の KCL コンシューマアプリケーションを複数のデータストリームを処理するように再構成すると、リーステーブルが壊れます。マルチストリームサポートでは LeaseKey 構造体は次のようになっている必要があるためです。
account-id:StreamName:StreamCreationTimestamp:ShardId
。
Kinesis クライアントライブラリとAWSスキーマレジストリ
Kinesis データストリームは、AWSスキーマレジストリ Glue -AWSGlue Schema Registry を使用すると、スキーマを一元的に検出、制御し、発展させることができ、生成されたデータが登録済みスキーマによって継続的に検証されます。スキーマは、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。-AWSGlue スキーマレジストリを使用すると、end-to-endストリーミングアプリケーション内のデータ品質とデータガバナンス。詳細については、「」を参照してください。AWSスキーマレジストリ。この統合を設定する方法の 1 つは、Java の KCL を使用する方法です。
重要
現在 Kinesis Data Streams とAWSGlue スキーマレジストリ統合は、Java で実装された KCL 2.3 コンシューマーを使用する Kinesis データストリームでのみサポートされます。多言語サポートは提供されていません。KCL 1.0 コンシューマーはサポートされていません。KCL 2.3 より前の KCL 2.x コンシューマはサポートされていません。
KCL を使用して KKinesis Data Streams とスキーマレジストリの統合を設定する方法の詳細については、の「KPL/KCL ライブラリを使用したデータの操作」セクションを参照してください。ユースケース: Amazon Kinesis Data Streams とAWSスキーマレジストリ。