翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Python での Kinesis クライアントライブラリコンシューマーの開発
Kinesis Client Library (KCL) を使用して、Kinesis Data Streams からデータを処理するアプリケーションを構築できます。Kinesis クライアントライブラリは、複数の言語で使用できます。このトピックでは、Python について説明します。
KCLはJavaライブラリである。Java以外の言語のサポートは、MultiLangデーモン。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangデーモン。さらに、MultiLangデーモンには、ユースケースに合わせてカスタマイズする必要のあるデフォルト設定例があります。AWS接続先のリージョン。の詳細については、「」を参照してくださいMultiLangデーモンオンGitHubを参照のことKCLMultiLangデーモンプロジェクト
Python KCL をからダウンロードするにはGitHubの参照先情報Kinesis クライアントライブラリ (Python)
Python で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。
の実装方法RecordProcessorクラスメソッド
RecordProcess
クラスでは、RecordProcessorBase
を拡張して次のメソッドを実装する必要があります。このサンプルでは、開始点として使用できる実装を提供しています (sample_kclpy_app.py
を参照してください)。
def initialize(self, shard_id)
def process_records(self, records, checkpointer)
def shutdown(self, checkpointer, reason)
initialize
KCLは、initialize
メソッドレコードプロセッサがインスタンス化されると、メソッドで、特定のシャード ID をパラメータとして渡します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です (このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。これは、Kinesis Data Streams には少なくとも 1 回はセマンティクスとは、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、「リシャーディング、拡張、並列処理」を参照してください。
def initialize(self, shard_id)
process_records
KCL は、このメソッドを呼び出し、で指定されたシャードのデータレコードのリストを渡します。initialize
方法。実装するレコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。たとえば、ワーカーは、データ変換を実行した後、Amazon Simple Storage Service (Amazon S3) バケットに結果を格納できます。
def process_records(self, records, checkpointer)
データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。record
ディクショナリは、レコードのデータ、シーケンス番号、およびパーティションキーにアクセスする次のキーと値のペアを公開します。
record.get('data')
record.get('sequenceNumber')
record.get('partitionKey')
データは Base64 でエンコードされていることに注意してください。
サンプルでは、メソッド process_records
に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。
Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、Checkpointer
参照先オブジェクトprocess_records
。レコードプロセッサは、checkpoint
このオブジェクトでメソッドを使用すると、シャード内のレコードの処理の進行状況をKCL に知らせます。ワーカーでエラーが発生すると、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。
分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサがcheckpoint
元のシャードのすべての処理が完了したことを通知します。
パラメータを渡さない場合、KCL は、checkpoint
は、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味します。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、checkpoint
を呼び出す必要があります。レコードプロセッサは、checkpoint
の各呼び出しで process_records
を呼び出す必要はありません。たとえば、プロセッサは、3 回呼び出すたびに、checkpoint
を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして checkpoint
に指定できます。この場合、KCL は、すべてのレコードがそのレコードまで処理されたと見なします。
サンプルでは、プライベートメソッド checkpoint
で、適切な例外処理と再試行のロジックを使用する Checkpointer.checkpoint
メソッドを呼び出す方法を示しています。
KCLは依存しているprocess_records
を使用して、データレコードの処理で発生するすべての例外を処理します。から例外がスローされた場合process_records
の場合、KCL は、に渡されたデータレコードをスキップします。process_records
例外の前に. つまり、これらのレコードは、例外をスローしたレコードプロセッサ、またはコンシューマーの他のレコードプロセッサに再送信されません。
shutdown
KCLは、shutdown
処理が終了したときの方法(シャットダウンの理由はTERMINATE
) またはワーカーが応答していない場合 (シャットダウン)reason
ですZOMBIE
).
def shutdown(self, checkpointer, reason)
シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。
KCLも渡すCheckpointer
参照先オブジェクトshutdown
。シャットダウンの reason
が TERMINATE
である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの checkpoint
メソッドを呼び出します。
設定プロパティを変更する
このサンプルでは、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます (sample.properties
を参照してください)。
アプリケーション名
KCL には、ユーザーのアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーション名が必要です。次のようにアプリケーション名の設定値を使用します。
-
このアプリケーション名と関連付けられたワーカーはすべて、同じストリーム上で連携して処理しているとみなされます。これらのワーカーは複数のインスタンスに分散している場合があります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。
-
KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーシャードマッピングなど) を維持します。各アプリケーションには、それぞれの DynamoDB テーブルがあります。詳細については、「リーステーブルを使用して KCL コンシューマアプリケーションによって処理されたシャードを追跡する」を参照してください。
認証情報を設定する
あなたは自分のものを作らなければなりませんAWSデフォルトの認証情報プロバイダーチェーンのいずれかの認証情報プロバイダーに対して、使用可能な認証情報。AWSCredentialsProvider
プロパティを使用して認証情報プロバイダーを設定できます。sample.properties
サンプルのプロパティファイルでは、で指定されているレコードプロセッサを使用して「words」という Kinesis データストリームを処理するように KCL を設定します。sample_kclpy_app.py
。