メニュー
Amazon Kinesis Data Streams
開発者ガイド

Python での Kinesis Client Library コンシューマーの開発

Kinesis Client Library(KCL)を使用して Kinesis data stream からデータを処理するアプリケーションを構築できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Python について説明します。

KCL は Java ライブラリです。Java 以外の言語のサポートは、MultiLangDaemon という多言語インターフェイスを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用するときに実行されます。 そのため、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなどの、ユースケースに合わせてカスタマイズする必要のあるデフォルト設定例があります。MultiLangDaemon の詳細については https://github.com/awslabs/amazon-kinesis-client/tree/master/src/main/java/com/amazonaws/services/kinesis/multilangGitHub の KCL MultiLangDaemon プロジェクトのページを参照してください。

GitHub から Python KCL をダウンロードするには、「Kinesis Client Library (Python)」にアクセスしてください。 Python KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub で「KCL for Python sample project」にアクセスしてください。

Python で KCLコンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

RecordProcessor クラスのメソッドを実装する

RecordProcess クラスでは、RecordProcessorBase を拡張して次のメソッドを実装する必要があります。このサンプルでは、開始点として使用できる実装を提供しています(sample_kclpy_app.py を参照してください)。

def initialize(self, shard_id) def process_records(self, records, checkpointer) def shutdown(self, checkpointer, reason)

initialize

KCL は、レコードプロセッサがインスタンス化されたときに、パラメータとして特定のシャード ID を渡して、initialize メソッドを呼び出します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です(このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。これは、Kinesis Data Streams では「少なくとも 1 回」のセマンティクスを使用しているためです。これは、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、「リシャーディング、拡張、並列処理」を参照してください。

def initialize(self, shard_id)

process_records

KCL は、initialize メソッドで指定されたシャードからのデータレコードのリストを渡して、このメソッドを呼び出します。実装するレコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。たとえば、ワーカーは、データ変換を実行した後、 S3 バケットに結果を格納できます。

def process_records(self, records, checkpointer)

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。record ディクショナリは、レコードのデータ、シーケンス番号、およびパーティションキーにアクセスする次のキーと値のペアを公開します。

record.get('data') record.get('sequenceNumber') record.get('partitionKey')

データは Base64 でエンコードされていることに注意してください。

サンプルでは、メソッド process_records に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、Checkpointer オブジェクトを process_records に渡すことで、この追跡を処理します。レコードプロセッサは、このオブジェクトの checkpoint メソッドを呼び出して、シャード内のレコードの処理の進行状況を KCL に通知します。ワーカーでエラーが発生した場合、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが checkpoint を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。

パラメータを渡さない場合、KCL は、checkpoint が呼び出されるときは、レコードプロセッサに渡された最後のレコードまで、すべてのレコードが処理されたと見なします。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、checkpoint を呼び出す必要があります。レコードプロセッサは、process_records の各呼び出しで checkpoint を呼び出す必要はありません。たとえば、プロセッサは、3 回呼び出すたびに、checkpoint を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして checkpoint に指定できます。この場合、KCL は、すべてのレコードがそのレコードまで処理されたと見なします。

サンプルでは、プライベートメソッド checkpoint で、適切な例外処理と再試行のロジックを使用する Checkpointer.checkpoint メソッドを呼び出す方法を示しています。

KCL は、process_records を使用して、データレコードの処理から発生するすべての例外を処理します。例外が process_records からスローされた場合、KCLは例外発生よりも前に process_records に渡されたデータレコードをスキップします。つまり、これらのレコードは、例外をスローしたレコードプロセッサや、コンシューマー内の他のレコードプロセッサには再送信されません。

shutdown

KCL は、処理が終了した場合(シャットダウンの理由は TERMINATE)またはワーカーが応答していない場合(シャットダウンの reasonZOMBIE)、shutdown メソッドを呼び出します。

def shutdown(self, checkpointer, reason)

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

また、KCL は、Checkpointer オブジェクトを shutdown に渡します。シャットダウンの reasonTERMINATE である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの checkpoint メソッドを呼び出します。

設定プロパティを変更する

このサンプルでは、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます(sample.properties を参照してください)。

アプリケーション名

KCL には、ユーザーのアプリケーション間、および同じリージョン内の DynamoDB テーブル間で一意のアプリケーションが必要です。次のようにアプリケーション名の設定値を使用します。

  • このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。

  • KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報(チェックポイントやワーカーとシャードのマッピングなど)を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、「Amazon Kinesis Data Streams Applicationの状態の追跡」を参照してください。

認証情報の設定

デフォルトの認証情報プロバイダチェーンのいずれかの認証情報プロバイダで AWS の認証情報を使用できるようにする必要があります。AWSCredentialsProvider プロパティを使用して認証情報プロバイダを設定できます。sample.properties で、デフォルトの認証情報プロバイダチェーンのいずれかの認証情報プロバイダでユーザーの認証情報を使用できるようにする必要があります。EC2 インスタンスでコンシューマーアプリケーションを実行している場合は、IAM ロールでインスタンスを設定することをお勧めします。この IAM ロールに関連付けられたアクセス許可を反映する AWS の認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーアプリケーションの認証情報を管理するための最も安全な方法です。

サンプルのプロパティファイルでは、KCL を設定し、sample_kclpy_app.py で指定されているレコードプロセッサを使用して「words」という Kinesis data stream を処理します。