翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
注記
Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL バージョン 3.x に移行することをお勧めします。これにより、パフォーマンスと新機能が向上します。最新の KCL ドキュメントと移行ガイドについては、「」を参照してくださいKinesis Client Library を使用する。
Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Python について説明します。
KCL は Java ライブラリであり、Java 以外の言語のサポートは、MultiLangDaemon と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定がいくつかあります。GitHub の MultiLangDaemon の詳細については、KCL MultiLangDaemon project
GitHub から Python KCL をダウンロードするには、Kinesis Client Library (Python)
Python で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。
RecordProcessor クラスのメソッドを実装する
RecordProcess
クラスでは、RecordProcessorBase
を拡張して次のメソッドを実装する必要があります。このサンプルでは、開始点として使用できる実装を提供しています (sample_kclpy_app.py
を参照してください)。
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
initialize
KCL は、レコードプロセッサがインスタンス化されると、initialize
メソッドを呼び出し、特定のシャード ID をパラメータとして渡します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です (このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。これは、Kinesis Data Streams は少なくとも 1 回のセマンティクスを使用しているからです。つまり、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。を参照してください。
def initialize(self, shard_id)
process_records
KCL は、このメソッドを呼び出し、initialize
メソッドで指定されたシャードのデータレコードのリストを渡します。実装するレコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。例えば、ワーカーはデータの変換を実行し、その結果を Amazon Simple Storage Service (Amazon S3) バケットに保存する場合があります。
def process_records(self, records, checkpointer)
データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。record
ディクショナリは、レコードのデータ、シーケンス番号、およびパーティションキーにアクセスする次のキーと値のペアを公開します。
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
データは Base64 でエンコードされていることに注意してください。
サンプルでは、メソッド process_records
に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。
Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、Checkpointer
オブジェクトを process_records
に渡すことで、この追跡をユーザーに代わって処理します。レコードプロセッサは、このオブジェクトの checkpoint
メソッドを呼び出して、シャード内のレコードの処理の進行状況を KCL に通知します。ワーカーでエラーが発生すると、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。
分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが checkpoint
を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。
パラメータを渡さないと、checkpoint
への呼び出しは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味すると KCL で見なされます。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、checkpoint
を呼び出す必要があります。レコードプロセッサは、checkpoint
の各呼び出しで process_records
を呼び出す必要はありません。たとえば、プロセッサは、3 回呼び出すたびに、checkpoint
を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして checkpoint
に指定できます。この場合、KCL は、そのレコードまでのすべてのレコードだけが処理されたと見なします。
サンプルでは、プライベートメソッド checkpoint
で、適切な例外処理と再試行のロジックを使用する Checkpointer.checkpoint
メソッドを呼び出す方法を示しています。
KCL は、process_records
を使用して、データレコードの処理から発生するすべての例外を処理します。例外が process_records
からスローされた場合、 は、例外発生前に に渡されたデータレコードをスキップします。つまり、これらのレコードは、例外をスローしたレコードプロセッサ、またはコンシューマーの他のレコードプロセッサに再送信されません。
shutdown
KCL は、処理が終了した場合 (シャットダウンの理由は TERMINATE
) またはワーカーが応答していない場合 (シャットダウンの reason
は ZOMBIE
)、shutdown
メソッドを呼び出します。
def shutdown(self, checkpointer, reason)
シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。
また、KCL は、Checkpointer
オブジェクトも shutdown
に渡します。シャットダウンの reason
が TERMINATE
である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの checkpoint
メソッドを呼び出します。
設定プロパティを変更する
このサンプルでは、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます (sample.properties
を参照してください)。
アプリケーション名
KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーションが必要です。次のようにアプリケーション名の設定値を使用します。
-
このアプリケーション名と関連付けられたワーカーはすべて、同じストリーム上で連携して処理しているとみなされます。これらのワーカーは複数のインスタンスに分散している場合があります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。
-
KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡するを参照してください。
認証情報の設定
デフォルトの AWS 認証情報プロバイダーチェーンの認証情報プロバイダーの 1 つが認証情報を使用できるようにする必要があります。AWSCredentialsProvider
プロパティを使用して認証情報プロバイダーを設定できます。sample.properties
サンプルのプロパティファイルでは、 で指定されているレコードプロセッサを使用してwordsという Kinesis data stream を処理するように KCL を設定します。