Java での Kinesis クライアントライブラリコンシューマーの開発 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Java での Kinesis クライアントライブラリコンシューマーの開発

Kinesis Data Streams のデータを処理するアプリケーションを構築するには Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Java について説明します。Javadoc リファレンスを表示するには、AWS「クラス 」の Javadoc トピック AmazonKinesisClientを参照してください。

から Java KCL をダウンロードするには GitHub、Kinesis Client Library (Java) にアクセスしてください。Apache Maven で Java KCL を検索するには、KCL 検索結果のページを参照してください。Java KCL コンシューマーアプリケーションのサンプルコードを からダウンロードするには GitHub、 の KCL for Java サンプルプロジェクトページを参照してください GitHub。

このサンプルアプリケーションは Apache Commons Logging を使用します。ログ設定は、configure ファイルで定義されている静的な AmazonKinesisApplicationSample.java メソッドを使用して変更できます。Log4j と AWS Java プリケーションで Apache Commons Logging を使用する方法の詳細については、AWS SDK for JavaデベロッパーガイドLog4j でログ記録するを参照してください。

Java で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

I RecordProcessor メソッドの実装

KCL は現在、IRecordProcessor インターフェイスの 2 つのバージョンをサポートしています。元のインターフェイスは最初のバージョンの KCL で利用可能です。バージョン 2 は KCL バージョン 1.5.0 から利用可能です。両方のインターフェイスが完全にサポートされています。選択するインターフェイスは、お使いのシナリオの要件によって異なります。相違点をすべて確認するには、ローカルに作成した Javadocs、またはソースコードを参照してください。以下のセクションでは、使い始めの最小限の実装を概説します。

オリジナルインターフェイス (バージョン 1)

オリジナルな IRecordProcessor interface (package com.amazonaws.services.kinesis.clientlibrary.interfaces) は、コンシューマーが実装しているべき次のレコードプロセッサメソッドを公開します。このサンプルでは、開始点として使用できる実装を提供しています (AmazonKinesisApplicationSampleRecordProcessor.java を参照してください)。

public void initialize(String shardId) public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
initialize

KCL は、レコードプロセッサがインスタンス化されると、initialize メソッドを呼び出し、特定のシャード ID をパラメータとして渡します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です (このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。Kinesis Data Streams は少なくとも 1 回のセマンティクスを使用しています。これは、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、リシャーディング、スケーリング、並列処理を参照してください。

public void initialize(String shardId)
processRecords

KCL は、processRecords メソッドを呼び出し、initialize(shardId) メソッドで指定されたシャードのデータレコードのリストを渡します。レコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。例えば、ワーカーはデータの変換を実行し、その結果を Amazon Simple Storage Service (Amazon S3) バケットに保存する場合があります。

public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。Record クラスは、レコードのデータ、シーケンス番号、およびパーティションキーへのアクセスを提供する次のメソッドを公開します。

record.getData() record.getSequenceNumber() record.getPartitionKey()

サンプルでは、プライベートメソッド processRecordsWithRetries に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、チェックポインタ (IRecordProcessorCheckpointer) を processRecords に渡すことで、この追跡をユーザーに代わって処理します。レコードプロセッサは、このインターフェイスで checkpoint メソッドを呼び出し、シャード内のレコードの処理の進行状況を KCL に知らせます。ワーカーでエラーが発生すると、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが checkpoint を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。

パラメータを渡さないと、checkpoint への呼び出しは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味すると KCL で見なされます。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、checkpoint を呼び出す必要があります。レコードプロセッサは、checkpoint の各呼び出しで processRecords を呼び出す必要はありません。たとえば、プロセッサは、checkpoint を 3 回呼び出すたびに、processRecords を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして checkpoint に指定できます。この場合、KCL は、すべてのレコードがそのレコードまで処理されたと見なします。

このサンプルでは、プライベートメソッド checkpoint で、適切な例外処理と再試行のロジックを使用する IRecordProcessorCheckpointer.checkpoint を呼び出す方法を示しています。

KCL は、processRecords を使用して、データレコードの処理から発生するすべての例外を処理します。例外が processRecords からスローされた場合、KCL は、例外発生前に渡されたデータレコードをスキップします。つまり、これらのレコードは、例外をスローしたレコードプロセッサ、またはコンシューマーの他のレコードプロセッサに再送信されません。

shutdown

KCL は、処理が終了した場合 (シャットダウンの理由は TERMINATE) またはワーカーが応答していない場合 (シャットダウンの理由は ZOMBIE)、shutdown メソッドを呼び出します。

public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

KCL はまた、IRecordProcessorCheckpointer インターフェイスを shutdown に渡します。シャットダウンの理由が TERMINATE である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの checkpoint メソッドを呼び出します。

更新されたインターフェイス (バージョン 2)

更新された IRecordProcessor interface (package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2) は、コンシューマーが実装しているべき次のレコードプロセッサメソッドを公開します。

void initialize(InitializationInput initializationInput) void processRecords(ProcessRecordsInput processRecordsInput) void shutdown(ShutdownInput shutdownInput)

コンテナオブジェクトのメソッドの呼び出しで、インターフェイスのオリジナルバージョンのすべての引数にアクセスできます。たとえば、processRecords() でレコードのリストを取得には、processRecordsInput.getRecords() が使用できます。

このインターフェイスのバージョン 2 (KCL 1.5.0 以降) では、オリジナルインターフェースで提供される入力に加えて次の新しい入力が使用できます。

シーケンス番号の開始

InitializationInput オペレーションへ渡される initialize() オブジェクトでは、開始シーケンス番号はレコードプロセッサのインスタンスに配信されるレコードです。このシーケンス番号は、同じシャードで処理されたレコードプロセッサインスタンスの最後のチェックポイントです。これは、アプリケーションでこの情報が必要になる場合のために提供されます。

保留チェックポイントシーケンス番号

initialize() オペレーションへ渡される InitializationInputオブジェクトの保留チェックポイントシーケンス番号 (ある場合) とは、前のレコードプロセッサインスタンスが停止する前にコミットできなかったものを示します。

I RecordProcessor インターフェイス用の Class Factory の実装

レコードプロセッサのメソッドを実装するクラスのファクトリも実装する必要があります。コンシューマーは、ワーカーをインスタンス化するときに、このファクトリへの参照を渡します。

サンプルでは、オリジナルのレコードプロセッサインターフェースを使用した、AmazonKinesisApplicationSampleRecordProcessorFactory.java ファイルのファクトリクラスを実装します。クラスファクトリでバージョン 2 レコードプロセッサを作成する場合には、com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 とい名のパッケージを使用してください。

public class SampleRecordProcessorFactory implements IRecordProcessorFactory { /** * Constructor. */ public SampleRecordProcessorFactory() { super(); } /** * {@inheritDoc} */ @Override public IRecordProcessor createProcessor() { return new SampleRecordProcessor(); } }

ワーカーの作成

I RecordProcessor メソッドの実装で説明しているように、KCL レコードプロセッサには選択できる 2 バージョンがあり、どちらを選ぶかでワーカーの作成方法に影響します。オリジナルレコードプロセッサインターフェイスは、次のコードストラクチャを使用してワーカーを作成します。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker(recordProcessorFactory, config);

レコード プロセッサインターフェイスのバージョン 2 では、Worker.Builder を使用してワーカを作成でき、どのコンストラクタを使うかや引数の順序を考慮する必要はありません。更新されたレコードプロセッサインターフェイスは、次のコードストラクチャを使用してワーカーを作成します。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();

設定プロパティを変更する

このサンプルでは、設定プロパティのデフォルト値を提供します。ワーカーのこの設定データは KinesisClientLibConfiguration オブジェクトにまとめられています。ワーカーをインスタンス化する呼び出しで、このオブジェクトと IRecordProcessor のクラスファクトリへの参照が渡されます。Java の properties ファイルを使用してこれらのプロパティを独自の値にオーバーライドできます (AmazonKinesisApplicationSample.java を参照してください)。

アプリケーション名

KCL には、複数のアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーション名が必要です。次のようにアプリケーション名の設定値を使用します。

  • このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。

  • KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡するを参照してください。

認証情報を設定する

デフォルトの認証情報プロバイダチェーンのいずれかの認証情報プロバイダで AWS の認証情報を使用できるようにする必要があります。例えば、EC2 インスタンスでコンシューマーを実行している場合は、IAM ロールでインスタンスを起動することをお勧めします。この IAM ロールに関連付けられた許可を反映する AWS 認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーの認証情報を管理するための最も安全な方法です。

サンプルアプリケーションは、最初にインスタンスメタデータから IAM 認証情報を取得しようとします。

credentialsProvider = new InstanceProfileCredentialsProvider();

サンプルアプリケーションは、インスタンスメタデータから認証情報を取得できない場合、properties ファイルから認証情報を取得しようとします。

credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

インスタンスメタデータの詳細については、Linux インスタンス用 Amazon EC2 ユーザーガイドインスタンスメタデータを参照してください。

複数のインスタンスへのワーカー ID の使用

サンプルの初期化コードは、次のコードスニペットに示すように、ローカルコンピュータ名にグローバル一意識別子を追加して、ワーカーの ID (workerId) を作成します。このアプローチによって、1 台のコンピュータでコンシューマーアプリケーションの複数のインスタンスを実行するシナリオに対応できます。

String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

レコードプロセッサインターフェイスのバージョン 2 への移行

オリジナルインターフェースで使われるコードを移行するためには、上記のステップに加えて、次の手順が必要となります。

  1. レコードプロセッサのクラスを変更して、バージョン 2 レコードプロセッサインターフェイスにインポートします。

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  2. コンテナオブジェクトで get メソッドを使用するには、入力するリファレンスを変更します。たとえば、shutdown() オペレーションで、checkpointershutdownInput.getCheckpointer() に変更します。

  3. レコードプロセッサのファクトリークラスを変更して、バージョン 2 レコードプロセッサファクトリーインターフェイスにインポートします。

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  4. ワーカーのコンストラクチャを変更して、Worker.Builder を使います。例:

    final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();