Kinesis Client Library の使用 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis Client Library の使用

KDS データストリームからデータを処理できるカスタムコンシューマーアプリケーションを開発する方法の 1 つは、Kinesis Client Library (KCL) を使用することです。

注記

KCL 1.x と KCL 2.x については、どちらも使用シナリオに応じて最新の KCL 1.x バージョンまたは KCL 2.x バージョンにアップグレードすることをお勧めします。KCL 1.x と KCL 2.x は、どちらも新しいリリースに伴って定期的に更新されています。これには、最新の依存関係パッチ、セキュリティパッチ、バグ修正、および下位互換性のある新機能が含まれます。詳細については、https://github.com/awslabs/ amazon-kinesis-client /release を参照してください。

Kinesis Client Library (KCL) とは何ですか?

KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesis Data Streams からデータを消費および処理するのに役立ちます。これには、複数のコンシューマーアプリケーションインスタンス間での負荷分散、コンシューマーアプリケーションインスタンスの障害に対する応答、処理済みのレコードのチェックポイント作成、リシャーディングへの対応が挙げられます。KCL はこれらのサブタスクをすべて処理するため、カスタムレコード処理ロジックの記述に集中できます。

KCL は AWS SDK で使用できる Kinesis Data Streams API とは異なることに注意してください。Kinesis Data Streams API では、Kinesis Data Streams の多くの機能 (ストリームの作成、リシャーディング、レコードの入力と取得など) を管理できます。KCL は、これらすべてのサブタスクの抽象化レイヤーを提供します。具体的には、コンシューマーアプリケーションのカスタムデータ処理ロジックに集中できます。Kinesis Data Streams API の詳細については、Amazon Kinesis API リファレンスを参照してください。

重要

KCL は Java ライブラリです。Java 以外の言語Support は、と呼ばれる多言語インタフェースを使用して提供されます。 MultiLangDaemonこのデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。たとえば、Python 用 KCL をインストールし、コンシューマアプリケーション全体を Python で作成する場合でも、の理由により、システムに Java をインストールする必要があります。 MultiLangDaemonさらに MultiLangDaemon 、 AWS 接続先のリージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定がいくつかあります。詳細については GitHub、「KCL MultiLangDaemon プロジェクト」を参照してください。 MultiLangDaemon

KCL は、レコード処理ロジックと Kinesis Data Streams の仲介として機能します。KCL は、次のタスクを実行します。

  • データストリームに接続する

  • データストリーム内のシャードを列挙する

  • リースを使用して、ワーカーとシャードの関連付けを調整する

  • レコードプロセッサで管理する各シャードのレコードプロセッサをインスタンス化する

  • データストリームからデータレコードを取得する

  • 対応するレコードプロセッサにレコードを送信する

  • 処理されたレコードのチェックポイントを作成する

  • ワーカーインスタンス数が変更されたとき、またはデータストリームがリシャード (シャードが分割またはマージされる) ときに、シャードワーカーの関連付け (リース) のバランスをとります。

KCL 使用可能なバージョン

現在、次のいずれかのサポートされているバージョンの KCL を使用して、カスタムコンシューマーアプリケーションを構築できます。

KCL 1.x または KCL 2.x のいずれかを使用して、共有スループットを使用するコンシューマーアプリケーションを構築できます。詳細については、KCL を使用したスループット共有カスタムコンシューマーの開発を参照してください。

専用スループット (拡張ファンアウトコンシューマー) を使用するコンシューマーアプリケーションを構築するには、KCL 2.x のみを使用できます。詳細については、スループット専有 (拡張ファンアウト) カスタムコンシューマーの開発を参照してください。

KCL 1.x と KCL 2.x の違い、および KCL 1.x から KCL 2.x に移行する方法については、コンシューマーを KCL 1.x から KCL 2.x に移行するを参照してください。

KCL の概念

  • KCL コンシューマーアプリケーション - KCL を使用してカスタムビルドされ、データストリームからレコードを取得して処理するように設計されたアプリケーション。

  • コンシューマーアプリケーションインスタンス - KCL コンシューマーアプリケーションは、通常、障害時の調整とデータレコード処理の動的な負荷分散のために、1 つ以上のアプリケーションインスタンスが同時に実行され、分散されます。

  • ワーカー - KCL コンシューマーアプリケーションインスタンスがデータの処理を開始するために使用する高レベルクラス。

    重要

    各 KCL コンシューマーアプリケーションインスタンスには 1 つのワーカーがあります。

    ワーカーは、シャードとリース情報の同期、シャード割り当ての追跡、シャードからのデータの処理など、さまざまなタスクを初期化し、監督します。ワーカーは、この KCL コンシューマーアプリケーションが処理するデータレコードを含むデータストリームの名前や、 AWS このデータストリームにアクセスするために必要な認証情報など、コンシューマーアプリケーションの構成情報を KCL に提供します。ワーカーは、その特定の KCL コンシューマーアプリケーションインスタンスを開始して、データストリームからレコードプロセッサにデータレコードを配信します。

    重要

    KCL 1.x では、このクラスはワーカーと呼ばれます。詳細については (これらは Java KCL リポジトリです)、https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/ClientLibrary/lib/worker/Worker.java を参照してください。KCL 2.x では、このクラスはスケジューラと呼ばれます。KCL 2.x のスケジューラの目的は、KCL 1.x のワーカーの目的と同じです。KCL 2.x amazon-kinesis-clientのスケジューラクラスの詳細については、amazon-kinesis-client https://github.com/awslabs/ /blob/master/ /src/main/java/software/amazon/kinesis/coordinator/Scheduler.java を参照してください

  • リース - ワーカーとシャード間のバインディングを定義するデータ。分散型 KCL コンシューマーアプリケーションは、リースを使用して、複数のワーカー間でデータレコード処理を分割します。いつでも、データレコードの各シャードは、leaseKey によって識別されるリースによって特定のワーカーにバインドされます。

    デフォルトでは、1 人のワーカーは (Worker 変数の値に応じて) 1 つ以上のリースを同時に保有できます。maxLeasesFor

    重要

    すべてのワーカーは、データストリーム内の利用可能なすべてのシャードについて、利用可能なすべてのリースを保持すると競合します。しかし、一度に各リースを正常に保持できるのは1人のワーカーだけです。

    例えば、4 つのシャードを持つデータストリームを処理しているワーカー A を持つコンシューマーアプリケーションインスタンス A がある場合、ワーカー A はシャード 1、2、3、および 4 へのリースを同時に保持できます。ただし、2 つのコンシューマーアプリケーションインスタンス (ワーカー A とワーカー B を含む A と B) があり、これらのインスタンスが 4 つのシャードを持つデータストリームを処理している場合、ワーカー A とワーカー B はシャード 1 へのリースを同時に保持できません。あるワーカーは、このシャードのデータレコードの処理を停止する準備ができるまで、または失敗するまで、特定のシャードへのリースを保持します。あるワーカーがリースの保留を停止すると、別のワーカーがリースを引き取り、保留します。

    詳細については (これらは Java KCL リポジトリです)、KCL 1.x については https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java、KCL 2.x については https://github.com/awslabs/ /blob/master/ /src/main/java/software/amazon/kinesis/leases/Lease.java を参照してください。amazon-kinesis-client amazon-kinesis-client

  • リーステーブル - KCL コンシューマーアプリケーションのワーカーによってリースおよび処理されている KDS データストリーム内のシャードを追跡するために使用される一意の Amazon DynamoDB テーブル。リーステーブルは、KCL コンシューマーアプリケーションの実行中に、データストリームからの最新のシャード情報と (ワーカー内およびすべてのワーカー間で) 同期を維持する必要があります。詳細については、リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡するを参照してください。

  • レコードプロセッサ - KCL コンシューマーアプリケーションがデータストリームから取得したデータを処理する方法を定義するロジック。実行時に、KCL コンシューマーアプリケーションインスタンスがワーカーをインスタンス化し、このワーカーは、リースを保持するシャードごとに 1 つのレコードプロセッサをインスタンス化します。

リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する

リーステーブルとは何ですか?

それぞれの Amazon Kinesis Data Streams アプリケーションに、KCLは、一意のリーステーブル (Amazon DynamoDB テーブルに保存されている) を使用して、KCL コンシューマーアプリケーションのワーカーによってリースおよび処理されている KDS データストリーム内のシャードを追跡します。

重要

KCL は、コンシューマーアプリケーションの名前を使用して、このコンシューマーアプリケーションが使用するリーステーブル名を作成します。したがって、各コンシューマーアプリケーション名は一意である必要があります。

コンシューマーアプリケーションの実行中に、Amazon DynamoDB コンソールを使用してリーステーブルを表示できます。

アプリケーションの起動時にKCL コンシューマーアプリケーションのリーステーブルが存在しない場合は、いずれかのワーカーがこのアプリケーションのリーステーブルを作成します。

重要

アカウントには、Kinesis Data Streams 自体に関連するコストに加えて、DynamoDB テーブルに関連するコストが発生します。

テーブルの各行は、コンシューマーアプリケーションのワーカーによって処理中のシャードを表します。KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理する場合、リーステーブルのハッシュキー leaseKey はシャード ID です。Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する であれば、leaseKey の構造は次のようになります: account-id:StreamName:streamCreationTimestamp:ShardId。例えば 111111111:multiStreamTest-1:12345:shardId-000000000336 です。

シャード ID に加えて、各行には次のデータが含まれます。

  • checkpoint: シャードの最新チェックポイントのシーケンス番号。この値はストリームのすべてのシャードで一意です。

  • checkpointSubSequence番号:Kinesis プロデューサーライブラリの集計機能を使用する場合、これは Kinesis レコード内の個々のユーザーレコードを追跡するチェックポイントの拡張機能です

  • leaseCounter: ワーカーのリースが他のワーカーに保持されていることをワーカーが検出できるように、リースのバージョニングに使用されます。

  • leaseKey: リースの固有識別子。各リースはデータストリームのシャードに固有であり、一度に 1 つのワーカーで保持されます。

  • leaseOwner: このリースを保持しているワーカー。

  • ownerSwitchesSinceチェックポイント:前回チェックポイントが作成されてから、このリースによってワーカーが何回変更されたか。

  • parentShardId: 子シャードで処理を開始する前に、親シャードが完全に処理されていることを確認するために使用されます。これにより、レコードがストリームに入力されたのと同じ順序で処理されるようになります。

  • hashrange: PeriodicShardSyncManager で使われて、定期的な同期を実行し、リーステーブルで欠落しているシャードを見つけ、必要に応じてリースを作成します。

    注記

    このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリーステーブルに存在します。PeriodicShardSyncManager の詳細およびリースとシャード間の定期的な同期については、KDS データストリームのシャードとリーステーブルの同期方法 を参照してください。

  • childshards: LeaseCleanupManager で使われて、子シャードの処理ステータスを確認し、親シャードをリーステーブルから削除できるかどうかを決定します。

    注記

    このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリーステーブルに存在します。

  • shardID: シャードの ID。

    注記

    このデータは、Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する である場合にのみリーステーブルに存在します。これは Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。

  • stream name 以下の形式のデータストリームの識別子: account-id:StreamName:streamCreationTimestamp

    注記

    このデータは、Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する である場合にのみリーステーブルに存在します。これは Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。

スループット

Amazon Kinesis Data Streams アプリケーションでプロビジョニングされたスループットの例外が発生した場合は、DynamoDB テーブルのプロビジョニングされたスループットを増やす必要があります。KCL がテーブルを作成するときにプロビジョニングされるスループットは、1 秒あたりの読み込み 10 回、1 秒あたりの書き込み 10 回ですが、これがユーザーのアプリケーションで十分でない場合があります。例えば、Amazon Kinesis Data Streams アプリケーションが頻繁にチェックポイントを作成する場合や、多くのシャードで構成されるストリームを処理する場合は、より多くのスループットが必要になる可能性があります。

DynamoDB でプロビジョニングされたスループットについては、Amazon DynamoDB デベロッパーガイド読み取り/書き込み容量モードおよびテーブルとデータの操作を参照してください。

KDS データストリームのシャードとリーステーブルの同期方法

KCL コンシューマーアプリケーションのワーカーは、リースを使用して特定のデータストリームからシャードを処理します。特定の時点でどのワーカーがどのシャードをリースしているかに関する情報は、リーステーブルに保存されます。リーステーブルは、KCL コンシューマーアプリケーションの実行中に、データストリームからの最新のシャード情報と同期を維持する必要があります。KCL は、コンシューマーアプリケーションのブートストラップ (コンシューマーアプリケーションの初期化時または再起動時)、および処理中のシャードが終了 (リシャーディング) に達するたびに、Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを同期します。つまり、ワーカーまたは KCL コンシューマーアプリケーションは、最初のコンシューマーアプリケーションのブートストラップ中、およびコンシューマーアプリケーションでデータストリームリシャードイベントが発生するたびに、処理中のデータストリームと同期されます。

KCL 1.0 - 1.13 と KCL 2.0 - 2.2 での同期

KCL 1.0 - 1.13 および KCL 2.0 - 2.2 では、コンシューマーアプリケーションのブートストラップ、および各データストリームのリシャードイベント中に、KCL は、ListShards または DescribeStream 検出 API を呼び出して、Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを同期します。上記のすべての KCL バージョンで、KCL コンシューマーアプリケーションの各ワーカーは、コンシューマーアプリケーションのブートストラップ中および各ストリームリシャードイベントでリース/シャード同期プロセスを実行するために、次の手順を完了します。

  • 処理中のストリームのデータのすべてのシャードをフェッチします。

  • リーステーブルからすべてのシャードリースをフェッチします。

  • リーステーブルにリースのないオープンシャードをフィルターで除外します。

  • 見つかったすべてのオープンシャードと、開いている親を持たない各オープンシャードについて反復処理します。

    • 階層ツリーをその祖先パスを通過して、シャードが子孫であるかどうかを判断します。祖先シャードが処理されている場合 (リーステーブルに祖先シャードのリースエントリが存在する場合)、または祖先シャードを処理する必要がある場合 (例えば、初期位置がTRIM_HORIZONまたはAT_TIMESTAMP)、シャードは子孫と見なされます。

    • コンテキスト内のオープンシャードが子孫である場合、KCL は初期位置に基づいてシャードをチェックポイントし、必要に応じて親のリースを作成します。

KCL 2.x での同期、KCL 2.3 以降で始まる

サポートされている最新バージョンの KCL 2.x (KCL 2.3) 以降では、ライブラリで同期プロセスに対する次の変更がサポートされるようになりました。これらのリース/シャード同期の変更により、KCL コンシューマーアプリケーションから Kinesis Data Streams サービスに対して実行される API コールの数が大幅に削減され、KCL コンシューマーアプリケーションのリース管理が最適化されます。

  • アプリケーションのブートストラップ中に、リーステーブルが空の場合、KCL はListShard API のフィルタリングオプション (ShardFilter オプションのリクエストパラメータ) を使用して、ShardFilter パラメータで指定された時間に開いているシャードのスナップショットに対してのみリースを取得および作成します。ShardFilter パラメーターを使用すると、ListShards API の応答をフィルターで除外できます。ShardFilter パラメータの唯一の必須プロパティは Type です。KCL は Type フィルタープロパティとその次の有効な値を使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。

    • AT_TRIM_HORIZON - 応答には、TRIM_HORIZON で開いていたすべてのシャードが含まれます。

    • AT_LATEST - 応答には、データストリームの現在開いているシャードのみが含まれます。

    • AT_TIMESTAMP - 応答には、開始タイムスタンプが指定されたタイムスタンプ以下で、終了タイムスタンプが指定されたタイムスタンプ以上であるか、またはまだ開いているすべてのシャードが含まれます。

    ShardFilter は空のリーステーブルのリースを作成して、RetrievalConfig#initialPositionInStreamExtended で指定したシャードのスナップショットのリースを初期化するときに使用されます。

    の詳細については、ShardFilterを参照してください。https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html

  • すべてのワーカーがリース/シャード同期を実行して、データストリーム内の最新のシャードでリーステーブルを最新の状態に保つ代わりに、選択された単一のワーカーリーダーがリース/シャードの同期を実行します。

  • KCL 2.3 は、GetRecords および SubscribeToShard APIのリターンパラメータ ChildShards を使用して、閉じたシャードに対して SHARD_END で発生するリース/シャード同期を実行します。これにより、KCL ワーカーは、処理が終了したシャードの子シャードに対してのみリースを作成できます。コンシューマーアプリケーション全体で共有する場合、リース/シャード同期のこの最適化では GetRecords API の ChildShards パラメータを使用します。専用スループット (拡張ファンアウト) コンシューマーアプリケーションの場合、リース/シャード同期のこの最適化では SubscribeToShard API の ChildShards パラメータを使用します。詳細については、、およびを参照してくださいGetRecordsSubscribeToShardsChildShard

  • 上記の変更により、KCL の動作は、既存のすべてのシャードについて学習するすべてのワーカーのモデルから、各ワーカーが所有するシャードの子シャードについてのみ学習するワーカーのモデルに移行します。したがって、コンシューマーアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、KCL は、リーステーブル内の潜在的なホールを特定するために、追加の定期的なシャード/リーススキャンを実行して (つまり、すべての新しいシャードについて学習する)、データストリームの完全なハッシュ範囲が処理されていることを確認し、必要に応じてそれらのリースを作成します。PeriodicShardSyncManagerは定期的なリース/シャードスキャンの実行を担当するコンポーネントです。

    KCL 2.3 の詳細についてはPeriodicShardSyncManagerhttps://github.com/awslabs/ amazon-kinesis-client /blob/master/ /src/main/java/software/amazon/kinesis/leases/ .java #L201-L213 amazon-kinesis-client を参照してください。LeaseManagementConfig

    KCL 2.3 では、新しい設定オプションを使用して、LeaseManagementConfigPeriodicShardSyncManager を設定できるようになりました。

    名前 デフォルト値 説明
    leasesRecoveryAuditorExecutionFrequencyMillis

    120000 (2 分)

    リーステーブルで部分的なリースをスキャンする監査ジョブの頻度 (ミリ単位)。監査者がストリームのリースの穴を検出すると、leasesRecoveryAuditorInconsistencyConfidenceThreshold に基づいてシャード同期がトリガーされます。

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    リーステーブル内のデータストリームのリースが矛盾しているかどうかを判断するための、定期的な監査ジョブの信頼しきい値。監査者がデータストリームに対して同じ不整合セットを何度も繰り返し検出すると、シャード同期がトリガーされます。

    CloudWatch の状態を監視するための新しいメトリクスも出力されるようになりました。PeriodicShardSyncManager詳細については、「PeriodicShardSyncManager」を参照してください。

  • HierarchicalShardSyncer への最適化を含めて、シャードの 1 つのレイヤーに対してのみリースを作成します。

KCL 1.x での同期、KCL 1.14 以降で始まる

サポートされている最新バージョンの KCL 1.x (KCL 1.14) 以降では、ライブラリで同期プロセスに対する次の変更がサポートされるようになりました。これらのリース/シャード同期の変更により、KCL コンシューマーアプリケーションから Kinesis Data Streams サービスに対して実行される API コールの数が大幅に削減され、KCL コンシューマーアプリケーションのリース管理が最適化されます。

  • アプリケーションのブートストラップ中に、リーステーブルが空の場合、KCL はListShard API のフィルタリングオプション (ShardFilter オプションのリクエストパラメータ) を使用して、ShardFilter パラメータで指定された時間に開いているシャードのスナップショットに対してのみリースを取得および作成します。ShardFilter パラメーターを使用すると、ListShards API の応答をフィルターで除外できます。ShardFilter パラメータの唯一の必須プロパティは Type です。KCL は Type フィルタープロパティとその次の有効な値を使用して、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別して返します。

    • AT_TRIM_HORIZON - 応答には、TRIM_HORIZON で開いていたすべてのシャードが含まれます。

    • AT_LATEST - 応答には、データストリームの現在開いているシャードのみが含まれます。

    • AT_TIMESTAMP - 応答には、開始タイムスタンプが指定されたタイムスタンプ以下で、終了タイムスタンプが指定されたタイムスタンプ以上であるか、またはまだ開いているすべてのシャードが含まれます。

    ShardFilter は空のリーステーブルのリースを作成して、KinesisClientLibConfiguration#initialPositionInStreamExtended で指定したシャードのスナップショットのリースを初期化するときに使用されます。

    の詳細については、ShardFilterを参照してください。https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html

  • すべてのワーカーがリース/シャード同期を実行して、データストリーム内の最新のシャードでリーステーブルを最新の状態に保つ代わりに、選択された単一のワーカーリーダーがリース/シャードの同期を実行します。

  • KCL 1.14 は、GetRecords および SubscribeToShard APIのリターンパラメータ ChildShards を使用して、閉じたシャードに対して SHARD_END で発生するリース/シャード同期を実行します。これにより、KCL ワーカーは、処理が終了したシャードの子シャードに対してのみリースを作成できます。詳細については、「」GetRecordsと「」を参照してください。ChildShard

  • 上記の変更により、KCL の動作は、既存のすべてのシャードについて学習するすべてのワーカーのモデルから、各ワーカーが所有するシャードの子シャードについてのみ学習するワーカーのモデルに移行します。したがって、コンシューマーアプリケーションのブートストラップおよびリシャードイベント中に発生する同期に加えて、KCL は、リーステーブル内の潜在的なホールを特定するために、追加の定期的なシャード/リーススキャンを実行して (つまり、すべての新しいシャードについて学習する)、データストリームの完全なハッシュ範囲が処理されていることを確認し、必要に応じてそれらのリースを作成します。PeriodicShardSyncManagerは定期的なリース/シャードスキャンの実行を担当するコンポーネントです。

    KinesisClientLibConfiguration#shardSyncStrategyTypeShardSyncStrategyType.SHARD_END に設定されると、PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold は、シャード同期を強制するために、リーステーブル内のホールを含む連続スキャンの数のしきい値を決定するために使用されます。KinesisClientLibConfiguration#shardSyncStrategyTypeShardSyncStrategyType.PERIODIC に設定されると、leasesRecoveryAuditorInconsistencyConfidenceThreshold は無視されます。

    KCL 1.14 の詳細については、https://github.com/awslabs/ PeriodicShardSyncManager amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ .java #L987 KinesisClientLibConfiguration-L999 を参照してください。

    KCL 1.14 では、新しい設定オプションを使用して、LeaseManagementConfigPeriodicShardSyncManager を設定できるようになりました。

    名前 デフォルト値 説明
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    リーステーブル内のデータストリームのリースが矛盾しているかどうかを判断するための、定期的な監査ジョブの信頼しきい値。監査者がデータストリームに対して同じ不整合セットを何度も繰り返し検出すると、シャード同期がトリガーされます。

    CloudWatch の状態を監視するための新しいメトリクスも出力されるようになりました。PeriodicShardSyncManager詳細については、「PeriodicShardSyncManager」を参照してください。

  • KCL 1.14 は、遅延リースのクリーンアップもサポートするようになりました。リースは、SHARD_END に到達したとき、シャードがデータストリームの保持期間を過ぎて期限切れになったとき、またはリシャーディングオペレーションの結果として閉じられたとき、LeaseCleanupManager により非同期的に削除されます。

    新しい設定オプションを使用して、LeaseCleanupManager を設定できるようになりました。

    名前 デフォルト値 説明
    leaseCleanupIntervalミリス

    1 分

    リースクリーンアップスレッドを実行する間隔。

    completedLeaseCleanupIntervalMillis 5 分

    リースが完了したかどうかをチェックする間隔。

    garbageLeaseCleanupIntervalMillis 30 分

    リースがガベージであるかどうかをチェックする間隔 (つまり、データストリームの保持期間を過ぎてトリミング)。

  • KinesisShardSyncer への最適化を含めて、シャードの 1 つのレイヤーに対してのみリースを作成します。

Java コンシューマーアプリケーションの同じ KCL 2.x で複数のデータストリームを処理する

このセクションでは、複数のデータストリームを同時に処理できる KCL コンシューマーアプリケーションを作成できる KCL 2.x for Java における以下の変更点について説明します。

重要

マルチストリーム処理は、Java 用 KCL 2.x でのみサポートされており、Java の場合は KCL 2.3 以降で始まります。

KCL 2.x を実装できる他の言語では、マルチストリーム処理はサポートされていません。

マルチストリーム処理は KCL 1.x のどのバージョンでもサポートされていません。

  • MultistreamTracker インターフェース

    複数のストリームを同時に処理できるコンシューマーアプリケーションを構築するには、という新しいインターフェースを実装する必要がありますMultistreamTracker。このインターフェースには、KCL コンシューマーアプリケーションによって処理されるデータストリームとその設定のリストを返す streamConfigList メソッドが含まれています。処理中のデータストリームは、コンシューマーアプリケーションのランタイム中に変更できることに注意してください。streamConfigListは、処理するデータストリームの変更について学習するために KCL によって定期的に呼び出されます。

    streamConfigListStreamConfigこのメソッドはリストにデータを入力します。

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    StreamIdentifier および InitialPositionInStreamExtended は必須フィールドですが、consumerArn は省略可能である点に注意してください。KCL 2.x を使用して拡張ファンアウトコンシューマーアプリケーションを実装する場合にのみ、consumerArn を提供する必要があります。

    の詳細についてはStreamIdentifierhttps://github.com/awslabs/ amazon-kinesis-client /blob/v2.5.8/ /src/main/java/software/amazon/kinesis/common/ amazon-kinesis-client .java #L129 を参照してください。StreamIdentifierを作成するにはStreamIdentifier、v2.5.0 以降で使用できるとからマルチストリームインスタンスを作成することをお勧めします。streamArn streamCreationEpochサポートされていない KCL v2.3 と v2.4 では、streamArmこの形式を使用してマルチストリームインスタンスを作成します。account-id:StreamName:streamCreationTimestampこの形式は次のメジャーリリースで廃止され、サポートされなくなります。

    MultistreamTracker には、リーステーブル内の古いストリームのリースを削除するための戦略も含まれます(formerStreamsLeasesDeletionStrategy)。コンシューマーアプリケーションのランタイム中は、ストラテジーを変更できないことに注意してください。詳細については、https://github.com/awslabs/ amazon-kinesis-client /blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ /src/main/java/software/amazon/kinesis/processor/.java amazon-kinesis-client を参照してください。FormerStreamsLeasesDeletionStrategy

  • ConfigsBuilderは、KCL コンシューマーアプリケーションの構築時に使用する KCL 2.x のすべての構成設定を指定するために使用できるアプリケーション全体のクラスです。 ConfigsBuilderクラスがインターフェースをサポートするようになりました。MultistreamTracker ConfigsBuilderどちらも、レコードを消費するデータストリームの名前で初期化できます。

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    また、複数のストリームを同時に処理する KCL MultiStreamTracker コンシューマーアプリケーションを実装したい場合は、 ConfigsBuilder で初期化することもできます。

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • KCL コンシューマーアプリケーションにマルチストリームサポートが実装されているため、アプリケーションのリーステーブルの各行に、このアプリケーションが処理する複数のデータストリームのシャード ID とストリーム名が含まれます。

  • KCL コンシューマーアプリケーションのマルチストリームサポートが実装されている場合、leaseKey は次の構造を取ります: account-id:StreamName:streamCreationTimestamp:ShardId。例えば 111111111:multiStreamTest-1:12345:shardId-000000000336 です。

    重要

    KCL コンシューマーアプリケーションが 1 つのデータストリームのみを処理する場合、リーステーブルのハッシュキー leaseKey はシャード ID です。この既存の KCL コンシューマーアプリケーションを複数のデータストリームを処理するように再構成すると、リーステーブルが壊れます。マルチストリームサポートでは LeaseKey 構造体は次のようになっている必要があるためです: account-id:StreamName:StreamCreationTimestamp:ShardId

Kinesis クライアントライブラリと AWS Glue スキーマレジストリの使用

Kinesis データストリームを AWS Glue スキーマレジストリと統合できます。 AWS Glue スキーマレジストリを使用すると、スキーマを一元的に検出、制御、および進化させながら、生成されたデータが登録されたスキーマによって継続的に検証されるようにできます。スキーマは、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。 AWS Glue Schema Registry を使用すると、 end-to-end ストリーミングアプリケーション内のデータ品質とデータガバナンスを向上させることができます。詳細については、AWS Glue スキーマレジストリを参照してください。この統合を設定する方法の 1 つは、Java で KCL を使用することです。

重要

現在、Kinesis Data Streams と AWS Glue スキーマのレジストリ統合は、Java で実装された KCL 2.3 コンシューマーを使用する Kinesis データストリームでのみサポートされています。多言語サポートは提供されていません。KCL 1.0 コンシューマーはサポートされていません。KCL 2.3 より前の KCL 2.x コンシューマーはサポートされていません。

KCL を使用して KKinesis Data Streams とスキーマレジストリの統合を設定する方法の詳細については、「ユースケース:Amazon Kinesis データストリームと Glue スキーマレジストリの統合」の「KPL/KCL ライブラリを使用したデータの操作」セクションを参照してください。 AWS