Node.js での Kinesis クライアントライブラリコンシューマーの開発 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Node.js での Kinesis クライアントライブラリコンシューマーの開発

Kinesis Client Library (KCL) を使用して、Kinesis Data Streams からデータを処理するアプリケーションを構築できます。Kinesis クライアントライブラリは、複数の言語で使用できます。このトピックでは、Node.js について説明します。

KCLはJavaライブラリである。Java以外の言語のサポートは、MultiLangデーモン。このデーモンは Java ベースで、Java 以外の KCL 言語を使用しているときにバックグラウンドで実行されます。そのため、KCL for Node.js をインストールして、コンシューマーアプリケーションをすべて Node.js で書く場合でも、MultiLangデーモン。さらに、MultiLangデーモンには、ユースケースに合わせてカスタマイズする必要のあるデフォルト設定例があります。AWS接続先のリージョン。の詳細については、「」を参照してください。MultiLangデーモンオンGitHubを参照のことKCLMultiLangデーモンプロジェクトページで.

から Node.js KCL をダウンロードするにはGitHubの参照先情報Kinesis クライアントライブラリ (Node.js)

サンプルコードのダウンロード

Node.js の KCL で使用可能な 2 つのサンプルコードがあります。

  • 基本サンプル

    Node.js で KCL コンシューマーアプリケーションを構築する方法の基本を説明する次のセクションで使用されます。

  • click-stream-sample

    基本サンプルコードを理解したあとの、やや上級で実際のシナリオを使用したサンプル。このサンプルはここでは説明しませんが、詳細を説明した README ファイルがあります。

Node.js で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

レコードプロセッサを実装する

Node.js 用の KCL を使用した最もシンプルなコンシューマーは、recordProcessor関数。この関数には、次の関数が含まれます。initialize,processRecords, およびshutdown。このサンプルでは、開始点として使用できる実装を提供しています (sample_kcl_app.js を参照してください)。

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
initialize

KCLは、initializeレコードプロセッサが起動すると、関数が起動します。このレコードプロセッサは initializeInput.shardId として渡されるシャード ID のみを処理し、通常、その逆も真です (このシャードはこのレコードプロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。これは、Kinesis Data Streams には少なくとも 1 回はセマンティクスとは、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、「リシャーディング、拡張、並列処理」を参照してください。

initialize: function(initializeInput, completeCallback)
processRecords

KCL は、この関数を呼び出して、指定されているシャードのデータレコードのリストが含まれている入力を使用します。initializefunction. 実装するレコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。たとえば、ワーカーは、データ変換を実行した後、Amazon Simple Storage Service (Amazon S3) バケットに結果を格納できます。

processRecords: function(processRecordsInput, completeCallback)

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれ、ワーカーはデータを処理するときに、これらを使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。record ディクショナリは、レコードのデータ、シーケンス番号、およびパーティションキーにアクセスする次のキーと値のペアを公開します。

record.data record.sequenceNumber record.partitionKey

データは Base64 でエンコードされていることに注意してください。

基本サンプルでは、関数 processRecords に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、この追跡をで処理します。checkpointerとして渡されたオブジェクトprocessRecordsInput.checkpointer。レコードプロセッサは、checkpointer.checkpoint関数を使用して、シャード内のレコードの処理の進行状況をKCL に知らせます。ワーカーでエラーが発生した場合、シャードの処理を再開するときに、処理されたことが分かっている最後のレコードから再開するように KCL はこの情報を使用します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサがcheckpoint元のシャードのすべての処理が完了したことを通知します。

シーケンス番号を渡さないとcheckpoint関数を使用すると、KCL はcheckpointは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味します。したがって、レコードプロセッサはcheckpoint のみ渡されたリスト内のすべてのレコードを処理した後。レコードプロセッサは、checkpoint の各呼び出しで processRecords を呼び出す必要はありません。たとえば、プロセッサは checkpoint を 3 回の呼び出しごとに呼び出したり、レコードプロセッサの外部イベント (実装したカスタムの認証または検証サービスなど) で呼び出したりできます。

オプションでレコードの正確なシーケンス番号をパラメータとして checkpoint に指定できます。この場合、KCL は、すべてのレコードがそのレコードまで処理されたと見なします。

基本サンプルアプリケーションでは、checkpointer.checkpoint 関数の最もシンプルな呼び出しを示します。関数のこの時点でコンシューマーに必要な他のチェックポイントロジックを追加できます。

shutdown

KCLは、shutdown処理が終了したときに機能する (shutdownInput.reasonですTERMINATE) または作業者が応答しなくなっている (shutdownInput.reasonですZOMBIE).

shutdown: function(shutdownInput, completeCallback)

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

KCLも渡すshutdownInput.checkpointer参照先オブジェクトshutdown。シャットダウンの理由が TERMINATE である場合、レコードプロセッサがすべてのデータレコードの処理を終了したことを確認し、このインターフェイスの checkpoint 関数を呼び出します。

設定プロパティを変更する

このサンプルでは、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます (基本サンプルの sample.properties を参照してください)。

アプリケーション名

KCL には、ユーザーのアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーションが必要です。次のようにアプリケーション名の設定値を使用します。

  • このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。

  • KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーシャードマッピングなど) を維持します。各アプリケーションには、それぞれの DynamoDB テーブルがあります。詳細については、「リーステーブルを使用して KCL コンシューマアプリケーションによって処理されたシャードを追跡する」を参照してください。

認証情報を設定する

あなたは自分のものを作らなければなりませんAWSデフォルトの認証情報プロバイダーチェーンのいずれかの認証情報プロバイダーに対して、使用可能な認証情報。AWSCredentialsProvider プロパティを使用して認証情報プロバイダーを設定できます。sample.properties ファイルでは、デフォルトの認証情報プロバイダーチェーンのいずれかの認証情報プロバイダーに対して、ユーザーの認証情報を使用可能にする必要があります。Amazon EC2 インスタンスでコンシューマーを実行している場合は、IAM ロールでインスタンスを設定することをお勧めします。AWSこの IAM ロールに関連付けられたアクセス許可を反映する認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーアプリケーションの認証情報を管理するための最も安全な方法です。

次の例は、という名前の Kinesis データストリームを処理するように KCL を設定します。kclnodejssampleで提供されているレコードプロセッサを使用するsample_kcl_app.js:

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON