Cookie の設定を選択する

当社は、当社のサイトおよびサービスを提供するために必要な必須 Cookie および類似のツールを使用しています。当社は、パフォーマンス Cookie を使用して匿名の統計情報を収集することで、お客様が当社のサイトをどのように利用しているかを把握し、改善に役立てています。必須 Cookie は無効化できませんが、[カスタマイズ] または [拒否] をクリックしてパフォーマンス Cookie を拒否することはできます。

お客様が同意した場合、AWS および承認された第三者は、Cookie を使用して便利なサイト機能を提供したり、お客様の選択を記憶したり、関連する広告を含む関連コンテンツを表示したりします。すべての必須ではない Cookie を受け入れるか拒否するには、[受け入れる] または [拒否] をクリックしてください。より詳細な選択を行うには、[カスタマイズ] をクリックしてください。

Python で Kinesis クライアントライブラリコンシューマーを開発する

フォーカスモード
Python で Kinesis クライアントライブラリコンシューマーを開発する - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

注記

Kinesis Client Library (KCL) バージョン 1.x および 2.x は古くなっています。KCL バージョン 3.x に移行することをお勧めします。これにより、パフォーマンスと新機能が向上します。最新の KCL ドキュメントと移行ガイドについては、「」を参照してくださいKinesis Client Library を使用する

Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Python について説明します。

KCL は Java ライブラリであり、Java 以外の言語のサポートは、MultiLangDaemon と呼ばれる多言語インターフェースを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要があるデフォルト設定がいくつかあります。GitHub の MultiLangDaemon の詳細については、KCL MultiLangDaemon projectのページを参照してください。

GitHub から Python KCL をダウンロードするには、Kinesis Client Library (Python) にアクセスしてください。Python KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub でKCL for Python sample projectページにアクセスしてください。

Python で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

RecordProcessor クラスのメソッドを実装する

RecordProcess クラスでは、RecordProcessorBase を拡張して次のメソッドを実装する必要があります。このサンプルでは、開始点として使用できる実装を提供しています (sample_kclpy_app.py を参照してください)。

def initialize(self, shard_id) def process_records(self, records, checkpointer) def shutdown(self, checkpointer, reason)
initialize

KCL は、レコードプロセッサがインスタンス化されると、initialize メソッドを呼び出し、特定のシャード ID をパラメータとして渡します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です (このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。これは、Kinesis Data Streams は少なくとも 1 回のセマンティクスを使用しているからです。つまり、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。を参照してください。

def initialize(self, shard_id)
process_records

KCL は、このメソッドを呼び出し、initialize メソッドで指定されたシャードのデータレコードのリストを渡します。実装するレコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。例えば、ワーカーはデータの変換を実行し、その結果を Amazon Simple Storage Service (Amazon S3) バケットに保存する場合があります。

def process_records(self, records, checkpointer)

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。record ディクショナリは、レコードのデータ、シーケンス番号、およびパーティションキーにアクセスする次のキーと値のペアを公開します。

record.get('data') record.get('sequenceNumber') record.get('partitionKey')

データは Base64 でエンコードされていることに注意してください。

サンプルでは、メソッド process_records に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、Checkpointer オブジェクトを process_records に渡すことで、この追跡をユーザーに代わって処理します。レコードプロセッサは、このオブジェクトの checkpoint メソッドを呼び出して、シャード内のレコードの処理の進行状況を KCL に通知します。ワーカーでエラーが発生すると、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが checkpoint を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。

パラメータを渡さないと、checkpoint への呼び出しは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味すると KCL で見なされます。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、checkpoint を呼び出す必要があります。レコードプロセッサは、checkpoint の各呼び出しで process_records を呼び出す必要はありません。たとえば、プロセッサは、3 回呼び出すたびに、checkpoint を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして checkpoint に指定できます。この場合、KCL は、そのレコードまでのすべてのレコードだけが処理されたと見なします。

サンプルでは、プライベートメソッド checkpoint で、適切な例外処理と再試行のロジックを使用する Checkpointer.checkpoint メソッドを呼び出す方法を示しています。

KCL は、process_records を使用して、データレコードの処理から発生するすべての例外を処理します。例外が process_records からスローされた場合、 は、例外発生前に に渡されたデータレコードをスキップします。つまり、これらのレコードは、例外をスローしたレコードプロセッサ、またはコンシューマーの他のレコードプロセッサに再送信されません。

shutdown

KCL は、処理が終了した場合 (シャットダウンの理由は TERMINATE) またはワーカーが応答していない場合 (シャットダウンの reasonZOMBIE)、shutdown メソッドを呼び出します。

def shutdown(self, checkpointer, reason)

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

また、KCL は、Checkpointer オブジェクトも shutdown に渡します。シャットダウンの reasonTERMINATE である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの checkpoint メソッドを呼び出します。

設定プロパティを変更する

このサンプルでは、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます (sample.properties を参照してください)。

アプリケーション名

KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーションが必要です。次のようにアプリケーション名の設定値を使用します。

  • このアプリケーション名と関連付けられたワーカーはすべて、同じストリーム上で連携して処理しているとみなされます。これらのワーカーは複数のインスタンスに分散している場合があります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。

  • KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡するを参照してください。

認証情報の設定

デフォルトの AWS 認証情報プロバイダーチェーンの認証情報プロバイダーの 1 つが認証情報を使用できるようにする必要があります。AWSCredentialsProvider プロパティを使用して認証情報プロバイダーを設定できます。sample.properties では、デフォルトの認証情報プロバイダーチェーンのいずれかの認証情報プロバイダーに対して、ユーザーの認証情報を使用可能にする必要があります。Amazon EC2 インスタンスでコンシューマーアプリケーションを実行している場合は、IAM ロールでインスタンスを設定することをお勧めします。この IAM ロールに関連付けられた許可を反映する AWS 認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーアプリケーションの認証情報を管理するための最も安全な方法です。

サンプルのプロパティファイルでは、 で指定されているレコードプロセッサを使用してwordsという Kinesis data stream を処理するように KCL を設定します。

このページの内容

プライバシーサイト規約Cookie の設定
© 2025, Amazon Web Services, Inc. or its affiliates.All rights reserved.