Kinesisクライアントライブラリの使用 - Amazon Kinesis Data Streams

「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」

Kinesisクライアントライブラリの使用

KDSデータ・ストリームからデータを処理できるカスタム・コンシューマ・アプリケーションを開発する方法の1つは、 Kinesis Client Library (KCL)。

Kinesis Client Library とは

KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesisデータストリームからのデータの消費と処理を支援します。これには、複数のコンシューマ・アプリケーション・インスタンス間でのロード・バランシング、コンシューマ・アプリケーション・インスタンスの障害への対応、処理済みレコードのチェックポイント作成、リシャーディングへの対応が含まれます。[ KCL では、これらのすべてのサブタスクを処理して、カスタム記録処理ロジックの作成に集中できるようにします。

KCLはKinesisデータストリームとは異なります APIs AWSで利用可能 SDKs. キネシスデータストリーム APIs は、ストリームの作成、リシャーディング、レコードの格納と取得など、Kinesis Data Stream のさまざまな側面の管理に役立ちます。KCLは、これらのすべてのサブタスクの抽象化レイヤーを提供し、特に、コンシューマアプリケーションのカスタムデータ処理ロジックに集中できるようにします。Kinesis Data Streams API の詳細については、 Amazon Kinesis APIリファレンス.

重要

KCLはJavaライブラリです。Java以外の言語のサポートは、 MultiLangDaemon. このデーモンはJavaベースであり、Java以外のKCL言語を使用している場合、バックグラウンドで実行されます。たとえば、Python用KCLをインストールし、Pythonに完全にコンシューマアプリケーションを書き込む場合、 MultiLangDaemon. また、 MultiLangDaemon には、使用例用にカスタマイズする必要があるいくつかのデフォルト設定があります。たとえば、接続先のAWS地域などです。[ MultiLangDaemon の GitHub、参照 ケーシーエル MultiLangDaemon プロジェクト.

KCL は、レコード処理ロジックと Kinesis Data Streams の仲介として機能します。KCL は、次のタスクを実行します。

  • データストリームに接続

  • データストリーム内のシャードを列挙する

  • リースを使用して、労働者とのシャード関連を調整する

  • レコードプロセッサで管理する各シャードのレコードプロセッサをインスタンス化する

  • データ ストリームからデータ レコードをプルします。

  • 対応するレコードプロセッサにレコードを送信する

  • 処理されたレコードのチェックポイントを作成する

  • ワーカー インスタンス数の変更時、またはデータ ストリームが再シャード化 (シャードが分割またはマージされる) されたときに、シャードとワーカーの関連付け (リース) のバランスを取ります。

KCL 利用可能なバージョン

現在、次のいずれかのサポートされているバージョンのKCLを使用して、カスタム コンシューマ アプリケーションを構築できます。

KCL 1.xまたはKCL 2.xを使用して、共有スループットを使用するコンシューマ アプリケーションを構築できます。詳細については、KCL を使用したスループット共有カスタムコンシューマーの開発 を参照してください。

専用スループット(ファンアウト・コンシューマの拡張)を使用するコンシューマ・アプリケーションを構築するには、KCL 2.xのみを使用できます。詳細については、スループット専有 (拡張ファンアウト) カスタムコンシューマーの開発 を参照してください。

KCL 1.x と KCL 2.x の違い、および KCL 1.x から KCL 2.x に移行する方法については、以下を参照してください。 コンシューマーを KCL 1.x から KCL 2.x に移行する.

KCL の概念

  • KCL消費者アプリケーション – KCLを使用してカスタムビルドされ、データストリームからレコードを読み取り、処理するように設計されたアプリケーション。

  • コンシューマ アプリケーション インスタンス - KCLコンシューマ・アプリケーションは通常、分散されており、1つ以上のアプリケーション・インスタンスが同時に実行され、障害の調整と動的なロード・バランシング・データ・レコード処理が行われます。

  • 労働者 – KCLコンシューマアプリケーションインスタンスがデータ処理を開始するために使用する高レベルクラス。

    重要

    各KCLコンシューマ・アプリケーション・インスタンスには、1人のワーカーがあります。

    シャードとリース情報の同期、シャードの割り当ての追跡、シャードからのデータの処理など、さまざまなタスクを初期化して監視します。ワーカーは、このKCLコンシューマ・アプリケーションが処理するデータ・レコードのデータ・ストリームの名前や、このデータ・ストリームにアクセスするために必要なAWS認証情報など、コンシューマ・アプリケーションの構成情報をKCLに提供します。また、特定のKCLコンシューマ・アプリケーション・インスタンスを起動して、データ・ストリームからレコード・プロセッサにデータ・レコードを配信します。

    重要

    KCL 1.x では、このクラスは と呼ばれます。 労働者. 詳細については、(Java KCLリポジトリ)を参照してください。 https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java. KCL 2.x では、このクラスは と呼ばれます。 スケジューラー. KCL 2.x のスケジューラーの目的は、KCL 1.x のワーカーの目的と同じです。KCL 2.xのスケジューラクラスについての詳細は、「 https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java.

  • リース – ワーカーとシャード間の結合を定義するデータ。分散KCLコンシューマ・アプリケーションは、リースを使用して、データ・レコード処理を一連のワーカー間でパーティション化します。どの時点でも、データ記録の各断片は、 leaseKey 変数。

    デフォルトで、ワーカーは1つまたは複数のリースを保持できます( maxLeasesForWorker 変数)を同時に設定します。

    重要

    すべての労働者は、データストリームにあるすべての利用可能なシャードについて、利用可能なすべてのリースを保持するよう努力します。ただし、一度に各リースを正常に保持できるのは、1人の労働者のみです。

    たとえば、4個のシャードを持つデータ ストリームを処理するワーカーAのコンシューマ アプリケーション インスタンスAがある場合、ワーカーAは、シャード1、2、3、および4のリースを同時に保持できます。ただし、コンシューマ アプリケーション インスタンスが2つある場合は、ワーカーAとワーカーBのAとB。これらのインスタンスは、4つのシャードを持つデータストリームを処理しています。ワーカーAとワーカーBは、リースをシャード1に同時に保持することはできません。1人のワーカーが、このシャードデータレコードの処理を停止する準備が整うまで、または処理が失敗するまで、特定のシャードに対するリースを保持します。一人の労働者がリースの保留を止めた場合、別の労働者がリースを引き受けて保留します。

    詳細については、(Java KCLリポジトリ)を参照してください。 https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java KCL 1.x および https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java KCL 2.x用。

  • リース表 - ユニークなAmazon DynamoDB KCLコンシューマアプリケーションのワーカーがリースおよび処理しているKDSデータストリーム内のシャードを追跡するために使用するテーブル。リース・テーブルは、KCLコンシューマ・アプリケーションの実行中は、データ・ストリームから最新のシャード情報と(ワーカー内およびすべてのワーカー間で)同期した状態を維持する必要があります。詳細については、リーステーブルを使用して、KCL消費者アプリケーションによって処理されたシャードを追跡する を参照してください。

  • 記録処理者 – KCLコンシューマ アプリケーションがデータ ストリームから取得するデータを処理する方法を定義するロジック。実行時に、KCLコンシューマアプリケーションインスタンスはワーカーをインスタンス化し、このワーカーはリースを保有するすべてのシャードに対して1つのレコードプロセッサをインスタンス化します。

リーステーブルを使用して、KCL消費者アプリケーションによって処理されたシャードを追跡する

リーステーブルとは

各 Amazon Kinesis Data Streams application、 KCL は、一意のリーステーブル( Amazon DynamoDB 表)を使用して、KCLコンシューマアプリケーションのワーカーがリースおよび処理しているKDSデータストリーム内のシャードを追跡します。

重要

KCL は、コンシューマ アプリケーションの名前を使用して、このコンシューマ アプリケーションが使用するリース テーブルの名前を作成します。したがって、各コンシューマ アプリケーション名は一意である必要があります。

リーステーブルは、 Amazon DynamoDB コンソール 消費者アプリケーションの実行中。

アプリケーションの起動時にKCLコンシューマ アプリケーションのリース テーブルが存在しない場合、いずれかのワーカーがこのアプリケーションのリース テーブルを作成します。

重要

アカウントには、Kinesis Data Streams 自体に関連するコストに加えて、DynamoDB テーブルに関連するコストが発生します。

リーステーブルの各行は、コンシューマアプリケーションのワーカーが処理中のシャードを表します。リーステーブルのハッシュキーは、 leaseKey、これはシャード ID です。

シャード ID に加えて、各行には次のデータが含まれます。

  • チェックポイント: シャードの最新のチェックポイント シーケンス番号。この値は、データ ストリーム内のすべてのシャードで一意です。

  • checkpointSubSequenceNumber: Kinesis Producer Libraryのアグリゲーション機能を使用する場合、これは チェックポイント これは、 Kinesis 記録。

  • leaseCounter: リースのバージョン管理に使用され、労働者が自分のリースが別の労働者によって取得されたことを検出できるようにします。

  • leaseKey: リースの一意の識別子。各リースは、データ・ストリーム内のシャードに特化しており、一度に1人のワーカーが保有します。

  • leaseOwner: このリースを保有する労働者。

  • ownerSwitchesSinceCheckpoint: 前回チェックポイントが書き込まれてから、このリースによってワーカーが変更された回数。

  • parentShardId: 子シャードの処理を開始する前に、親シャードが完全に処理されるようにするために使用されます。これにより、レコードがストリームに入力されたのと同じ順序で処理されるようになります。

  • ハッシュ範囲: によって使用 PeriodicShardSyncManager 定期的な同期を実行して、リーステーブルで不足しているシャードを見つけ、必要に応じてリースを作成します。

    注記

    このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリース表にあります。詳細情報については、 PeriodicShardSyncManager また、リースとシャード間の定期的な同期については、 KDSデータストリーム内のシャードとリーステーブルを同期する方法.

  • チャイルドシャード: によって使用 LeaseCleanupManager 子シャードの処理ステータスを確認し、親シャードをリーステーブルから削除できるかどうかを決定します。

    注記

    このデータは、KCL 1.14 および KCL 2.3 で始まるすべてのシャードのリース表にあります。

Throughput

Amazon Kinesis Data Streams application でプロビジョニングされたスループットの例外が発生した場合は、DynamoDB テーブルのプロビジョニングされたスループットを増やす必要があります。KCL がテーブルを作成するときにプロビジョニングされるスループットは、1 秒あたりの読み込み 10 回、1 秒あたりの書き込み 10 回ですが、これがユーザーのアプリケーションで十分でない場合があります。たとえば、Amazon Kinesis Data Streams applicationが頻繁にチェックポイントを作成する場合や、多くのシャードで構成されるストリームを処理する場合は、より多くのスループットが必要になる可能性があります。

プロビジョニングされたスループットの詳細については、 DynamoDB、参照 読み取り/書き込み容量モード および テーブルとデータの操作Amazon DynamoDB 開発者ガイド.

KDSデータストリーム内のシャードとリーステーブルを同期する方法

KCL消費者アプリケーションのワーカーは、特定のデータストリームからのシャードを処理するためにリースを使用します。いつ何のシャードをリースするかについての情報は、リーステーブルに保管されます。リース・テーブルは、KCLコンシューマ・アプリケーションの実行中は、データ・ストリームからの最新のシャード情報と同期したままにする必要があります。KCL は、コンシューマアプリケーションブートストラッピング中 (コンシューマアプリケーションが初期化または再起動された場合) および処理中のシャードが終了 (リシャーディング) に達したときに、Kinesis Data Streams サービスから取得したシャード情報とリーステーブルを同期します。言い換えれば、ワーカーまたはKCLコンシューマ・アプリケーションは、コンシューマ・アプリケーションの初期ブートストラップ中、およびコンシューマ・アプリケーションがデータ・ストリーム・リシャード・イベントに遭遇したときに処理しているデータ・ストリームと同期されます。

KCL 1.0 - 1.13およびKCL 2.0 - 2.2での同期

KCL 1.0~1.13およびKCL 2.0~2.2では、コンシューマアプリケーションのブートストラップ中および各データストリームのリシャードイベント中、KCLは、 ListShards または DescribeStream 発見 APIs. 上記のすべてのKCLバージョンでは、KCLコンシューマ アプリケーションの各ワーカーは、コンシューマ アプリケーションのブートストラップ中および各ストリームのreshardイベントでリース/シャード同期プロセスを実行するために、次の手順を実行します。

  • 処理中のストリームのデータのすべてのシャードを取得します。

  • リーステーブルからすべてのシャードリースを取得

  • リーステーブルにリースのないオープンシャードをフィルタで除外します

  • 見つかったすべてのオープンシャードと、オープン親のないオープンシャードを反復します。

    • 階層ツリーの先祖パスを横断して、シャードが子孫であるかどうかを判断します。シャードは子孫と見なされます。親のシャードが処理されている場合(親のシャードのリースエントリがリーステーブルに存在する場合)、または親のシャードが処理されるべき場合(たとえば、最初の位置が TRIM_HORIZON 又は AT_TIMESTAMP)

    • コンテキストで開いたシャードが子孫の場合、KCLは最初の位置に基づいてシャードをチェックポイントし、必要に応じて親のリースを作成します

KCL 2.xの同期、KCL 2.3以降から開始

KCL 2.x (KCL 2.3) 以降の最新のサポート バージョンから、ライブラリは同期プロセスに以下の変更をサポートするようになりました。これらのリース/シャード同期の変更により、KCLコンシューマ・アプリケーションからKinesis Data StreamsサービスへのAPI呼び出しの数が大幅に削減され、KCLコンシューマ・アプリケーションのリース管理が最適化されます。

  • アプリケーションのブートストラップ中、リーステーブルが空の場合、KCLは ListShard APIのフィルタリングオプション( ShardFilter オプションのリクエストパラメータ) を使用して、 ShardFilter パラメータ。[ ShardFilter パラメータを使用すると、 ListShards API。唯一の必要なプロパティは、 ShardFilter パラメータは Type. KCLは Type 物件をフィルタリングし、その有効な値を以下のものにして、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別し、返します。

    • AT_TRIM_HORIZON - 応答には、 で開いているすべてのシャードが含まれます。 TRIM_HORIZON.

    • AT_LATEST - 応答には、現在開いているデータストリームのシャードのみが含まれます。

    • AT_TIMESTAMP - 応答には、開始タイムスタンプが指定されたタイムスタンプ以下であり、終了タイムスタンプが指定されたタイムスタンプ以上または開いているすべてのシャードが含まれます。

    ShardFilter は、次の場所で指定したシャードのスナップショットのリースを初期化するために、空のリーステーブルのリースを作成するときに使用されます。 RetrievalConfig#initialPositionInStreamExtended.

    詳細情報については、 ShardFilter、参照 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • リース/シャード同期を実行するすべてのワーカーが、データストリーム内の最新のシャードでリーステーブルを最新の状態に維持する代わりに、1人の選出されたワーカーリーダーがリース/シャード同期を実行します。

  • KCL 2.3では、 ChildShards return パラメーター GetRecords そして SubscribeToShard APIs リース/シャードの同期を SHARD_END 閉じたシャードの場合、KCL労働者は、処理を終了したシャードの子シャードのリースのみを作成できます。コンシューマ・アプリケーション全体で共有する場合、このリース/シャード同期の最適化では、 ChildShards のパラメータ GetRecords API。専用スループット(拡張ファンアウト)のコンシューマ・アプリケーションでは、リース/シャード同期のこの最適化では、 ChildShards のパラメータ SubscribeToShard API。詳細については、以下を参照してください。 GetRecordsSubscribeToShards、および ChildShard.

  • 上記の変更に伴い、KCLの行動は、すべての既存のシャードについて学習するすべての労働者のモデルから、各労働者が所有するシャードの子供用シャードについてのみ学習する労働者のモデルに移行しています。したがって、コンシューマ・アプリケーションのブートストラップとリシャード・イベント中に発生する同期に加えて、KCLは、リース・テーブル内の潜在的な穴(つまり、すべての新しいシャードについて学習するため)を特定し、データ・ストリームの完全なハッシュ範囲が処理され、必要に応じてリースを作成するため、定期的なシャード/リース・スキャンも追加します。PeriodicShardSyncManager は、定期的なリース/シャードスキャンを実行するコンポーネントです。

    詳細情報については、 PeriodicShardSyncManager KCL 2.3で参照 https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L201-L213.

    KCL 2.3では、新しい構成オプションが利用可能です。 PeriodicShardSyncManager () LeaseManagementConfig:

    Name デフォルト値 説明
    leasesRecoveryAuditorExecutionFrequencyMillis

    120000(2分)

    リーステーブルの一部リースをスキャンするための監査人ジョブの頻度(ミリ単位)。監査人がストリームのリースに穴を検出した場合、 leasesRecoveryAuditorInconsistencyConfidenceThreshold.

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    定期監査人ジョブの信頼度しきい値。リーステーブル内のデータストリームのリースが矛盾しているかどうかを判断します。監査人がこのデータ ストリームに対して同じ不整合を何度も連続して検出すると、シャード同期がトリガーされます。

    新規 CloudWatch また、 PeriodicShardSyncManager. 詳細については、以下を参照してください。 PeriodicShardSyncManager.

  • 最適化を含む HierarchicalShardSyncer シャードの 1 層のリースのみを作成します。

KCL 1.xの同期、KCL 1.14以降から開始

KCL 1.x (KCL 1.14) 以降の最新のサポート バージョンから、ライブラリは同期プロセスに以下の変更をサポートするようになりました。これらのリース/シャード同期の変更により、KCLコンシューマ・アプリケーションからKinesis Data StreamsサービスへのAPI呼び出しの数が大幅に削減され、KCLコンシューマ・アプリケーションのリース管理が最適化されます。

  • アプリケーションのブートストラップ中、リーステーブルが空の場合、KCLは ListShard APIのフィルタリングオプション( ShardFilter オプションのリクエストパラメータ) を使用して、 ShardFilter パラメータ。[ ShardFilter パラメータを使用すると、 ListShards API。唯一の必要なプロパティは、 ShardFilter パラメータは Type. KCLは Type 物件をフィルタリングし、その有効な値を以下のものにして、新しいリースを必要とする可能性のあるオープンシャードのスナップショットを識別し、返します。

    • AT_TRIM_HORIZON - 応答には、 で開いているすべてのシャードが含まれます。 TRIM_HORIZON.

    • AT_LATEST - 応答には、現在開いているデータストリームのシャードのみが含まれます。

    • AT_TIMESTAMP - 応答には、開始タイムスタンプが指定されたタイムスタンプ以下であり、終了タイムスタンプが指定されたタイムスタンプ以上または開いているすべてのシャードが含まれます。

    ShardFilter は、次の場所で指定したシャードのスナップショットのリースを初期化するために、空のリーステーブルのリースを作成するときに使用されます。 KinesisClientLibConfiguration#initialPositionInStreamExtended.

    詳細情報については、 ShardFilter、参照 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • リース/シャード同期を実行するすべてのワーカーが、データストリーム内の最新のシャードでリーステーブルを最新の状態に維持する代わりに、1人の選出されたワーカーリーダーがリース/シャード同期を実行します。

  • KCL 1.14は、 ChildShards return パラメーター GetRecords そして SubscribeToShard APIs リース/シャードの同期を SHARD_END 閉じたシャードの場合、KCL労働者は、処理を終了したシャードの子シャードのリースのみを作成できます。詳細については、「GetRecords」および「ChildShard」を参照してください。

  • 上記の変更に伴い、KCLの行動は、すべての既存のシャードについて学習するすべての労働者のモデルから、各労働者が所有するシャードの子供用シャードについてのみ学習する労働者のモデルに移行しています。したがって、コンシューマ・アプリケーションのブートストラップとリシャード・イベント中に発生する同期に加えて、KCLは、リース・テーブル内の潜在的な穴(つまり、すべての新しいシャードについて学習するため)を特定し、データ・ストリームの完全なハッシュ範囲が処理され、必要に応じてリースを作成するため、定期的なシャード/リース・スキャンも追加します。PeriodicShardSyncManager は、定期的なリース/シャードスキャンを実行するコンポーネントです。

    いつ KinesisClientLibConfiguration#shardSyncStrategyType は に設定されています ShardSyncStrategyType.SHARD_ENDPeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold は、リーステーブルに穴を含む連続スキャンの数のしきい値を決定し、その後にシャード同期を実行します。いつ KinesisClientLibConfiguration#shardSyncStrategyType は に設定されています ShardSyncStrategyType.PERIODICleasesRecoveryAuditorInconsistencyConfidenceThreshold は無視されます。

    詳細情報については、 PeriodicShardSyncManager KCL 1.14で参照 https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L987-L999.

    KCL 1.14では、新しい構成オプションが使用可能 PeriodicShardSyncManager () LeaseManagementConfig:

    Name デフォルト値 説明
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    定期監査人ジョブの信頼度しきい値。リーステーブル内のデータストリームのリースが矛盾しているかどうかを判断します。監査人がこのデータ ストリームに対して同じ不整合を何度も連続して検出すると、シャード同期がトリガーされます。

    新規 CloudWatch また、 PeriodicShardSyncManager. 詳細については、以下を参照してください。 PeriodicShardSyncManager.

  • また、KCL 1.14 は繰延リースのクリーンアップもサポートしています。リースは によって非同期的に削除されます LeaseCleanupManager 手を伸ばした時に SHARD_END、シャードがデータ ストリームの保存期間を過ぎた場合、またはリシャーディング操作の結果として閉じられた場合。

    構成する新しい構成オプションが使用可能 LeaseCleanupManager.

    Name デフォルト値 説明
    leaseCleanupIntervalMillis

    1 分

    リースクリーンアップスレッドを実行する間隔。

    completedLeaseCleanupIntervalMillis 5 分

    リースが完了したかどうかをチェックする間隔。

    garbageLeaseCleanupIntervalMillis 30 分

    リースがガベージかどうかをチェックする間隔(つまり、データ・ストリームの保持期間を過ぎて切り取られたかどうか)。

  • 最適化を含む KinesisShardSyncer シャードの 1 層のリースのみを作成します。