メニュー
Amazon Kinesis Data Streams
開発者ガイド

Java での Kinesis Client Library コンシューマーの開発

Kinesis Client Library(KCL)を使用して Kinesis data stream からデータを処理するアプリケーションを構築できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Java について説明します。javadoc レファレンスを表示するには、「AWS javadoc topic for Class AmazonKinesisClient」を参照してください。

GitHub から Java KCL をダウンロードするには、「Kinesis Client Library (Java)」を参照してください。 Maven で Java KCL を検索するには、「KCLsearch results」参照してください。 Java KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub の「KCL for Java sample project」を参照してください。

このサンプルアプリケーションは Apache Commons Logging を使用します。configure ファイルで定義されている静的メソッド AmazonKinesisApplicationSample.java を使用して、ログ記録の設定を変更できます。Apache Commons Logging を Log4j や AWS Java アプリケーションとともに使用する方法の詳細については、『AWS SDK for Java Developer Guide』の「Log4j を使用したログ記録」を参照してください。

Java で KCLコンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

IRecordProcessor メソッドを実装する

KCL は現在、IRecordProcessor インターフェイスの 2 つのバージョンをサポートしています。最初のインターフェースは KCL のファーストバージョンで、バージョン 2 は KCL 1.5.0 バージョンから利用可能です。どちらのインターフェイスも完全にサポートされています。どちらにするかの選択は、お使いのシナリオの要件によって異なります。 違いについての詳細は、ローカルの Java ビルドの Java ドキュメント、あるいはソースコードを参照してください。 以下のセクションでは、使い始めの最小限の実装を概説します。

オリジナルインターフェイス(バージョン 1)

オリジナルな IRecordProcessor interface (package com.amazonaws.services.kinesis.clientlibrary.interfaces) は、コンシューマーが実装しているべき次のレコードプロセッサメソッドを公開します。 このサンプルでは、開始点として使用できる実装を提供しています(AmazonKinesisApplicationSampleRecordProcessor.java を参照してください)。

public void initialize(String shardId) public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

initialize

KCL は、レコードプロセッサがインスタンス化されたときに、パラメータとして特定のシャード ID を渡して、initialize メソッドを呼び出します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です(このシャードはこのレコード プロセッサによってのみ処理されます)。 ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。Kinesis Data Streams では「少なくとも 1 回」のセマンティクスを使用しています。これは、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、「リシャーディング、拡張、並列処理」を参照してください。

public void initialize(String shardId)

processRecords

KCL は、initialize(shardId) メソッドで指定されたシャードからのデータレコードのリストを渡して、processRecords メソッドを呼び出します。 レコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。たとえば、ワーカーは、データ変換を実行した後、Amazon S3 バケットに結果を格納できます。

public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。Record クラスは、レコードのデータ、シーケンス番号、およびパーティションキーへのアクセスを提供する次のメソッドを公開します。

record.getData() record.getSequenceNumber() record.getPartitionKey()

サンプルでは、プライベートメソッド processRecordsWithRetries に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、チェックポインタ(IRecordProcessorCheckpointer)を processRecords に渡すことで、この追跡を処理します。レコードプロセッサは、このインターフェイスの checkpoint メソッドを呼び出して、シャード内のレコードの処理の進行状況を KCL に通知します。ワーカーでエラーが発生した場合、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサが checkpoint を呼び出して元のシャードの処理がすべて完了したことを通知するまで、新しいシャードの処理を開始しません。

パラメータを渡さない場合、KCL は、checkpoint が呼び出されるときは、レコードプロセッサに渡された最後のレコードまで、すべてのレコードが処理されたと見なします。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、checkpoint を呼び出す必要があります。レコードプロセッサは、processRecords の各呼び出しで checkpoint を呼び出す必要はありません。たとえば、プロセッサは、checkpoint を 3 回呼び出すたびに、processRecords を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして checkpoint に指定できます。この場合、KCL は、すべてのレコードがそのレコードまで処理されたと見なします。

このサンプルでは、プライベートメソッド checkpoint で、適切な例外処理と再試行のロジックを使用する IRecordProcessorCheckpointer.checkpoint を呼び出す方法を示しています。

KCL は、processRecords を使用して、データレコードの処理から発生するすべての例外を処理します。例外が processRecords からスローされた場合、KCL は例外発生よりも前に渡されたデータレコードをスキップします。つまり、これらのレコードは、例外をスローしたレコードプロセッサや、コンシューマー内の他のレコードプロセッサには再送信されません。

shutdown

KCL は、処理が終了した場合(シャットダウンの理由は TERMINATE)またはワーカーが応答していない場合(シャットダウンの理由は ZOMBIE)、shutdown メソッドを呼び出します。

public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

KCL はまた、IRecordProcessorCheckpointer インターフェイスを shutdown に渡します。 シャットダウンの理由が TERMINATE である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの checkpoint メソッドを呼び出します。

更新されたインターフェイス(バージョン 2)

更新された IRecordProcessor interface (package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2) は、コンシューマーが実装しているべき次のレコードプロセッサメソッドを公開します。

void initialize(InitializationInput initializationInput) void processRecords(ProcessRecordsInput processRecordsInput) void shutdown(ShutdownInput shutdownInput)

コンテナオブジェクトのメソッドの呼び出しで、インターフェイスのオリジナルバージョンのすべての引数にアクセスできます。 たとえば、processRecords() でレコードのリストを取得には、processRecordsInput.getRecords() が使用できます。

このインターフェイスのバージョン 2(KCL 1.5.0 以降)0では、オリジナルインターフェースで提供される入力に加えて次の新しい入力が使用できます。

シーケンス番号の開始

initialize() オペレーションへ渡される InitializationInput オブジェクトでは、開始シーケンス番号はレコードプロセッサのインスタンスに配信されるレコードです。 このシーケンス番号は、同じシャードで処理されたレコードプロセッサインスタンスの最後のチェックポイントです。 これは、アプリケーションでこの情報が必要になる場合のために提供されます。

保留チェックポイントシーケンス番号

initialize() オペレーションへ渡される InitializationInput オブジェクトの保留チェックポイントシーケンス番号(ある場合)とは、前のレコードプロセッサが停止する以前に実行できなかったものを示します。

IRecordProcessor インターフェイスのクラスファクトリを実装する

レコードプロセッサのメソッドを実装するクラスのファクトリも実装する必要があります。コンシューマーは、ワーカーをインスタンス化するときに、このファクトリへの参照を渡します。

サンプルでは、オリジナルのレコードプロセッサインターフェースを使用した、AmazonKinesisApplicationSampleRecordProcessorFactory.java ファイルのファクトリクラスを実装します。 クラスファクトリでバージョン 2 レコードプロセッサを作成する場合には、com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 とい名のパッケージを使用してください。

public class SampleRecordProcessorFactory implements IRecordProcessorFactory { /** * Constructor. */ public SampleRecordProcessorFactory() { super(); } /** * {@inheritDoc} */ @Override public IRecordProcessor createProcessor() { return new SampleRecordProcessor(); } }

ワーカーの作成

IRecordProcessor メソッドを実装する」で説明されるように、KCL レコードプロセッサには選択できる 2 バージョンがあり、ワーカーを作成する方法に影響します。 オリジナルレコードプロセッサインターフェイスは、次のコードストラクチャを使用してワーカーを作成します。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker(recordProcessorFactory, config);

レコード プロセッサインターフェイスのバージョン 2 では、Worker.Builder を使用してワーカを作成でき、どのコンストラクタを使うかや引数の順序を考慮する必要はありません。 更新されたレコードプロセッサインターフェイスは、次のコードストラクチャを使用してワーカーを作成します。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();

設定プロパティを変更する

このサンプルでは、設定プロパティのデフォルト値を提供します。ワーカーのこの設定データは KinesisClientLibConfiguration オブジェクトにまとめられています。ワーカーをインスタンス化する呼び出しで、このオブジェクトと IRecordProcessor のクラスファクトリへの参照が渡されます。Java の properties ファイルを使用してこれらのプロパティを独自の値にオーバーライドできます(AmazonKinesisApplicationSample.java を参照してください)。

アプリケーション名

KCL には、ユーザーのアプリケーション間、および同じリージョン内の DynamoDB テーブル間で一意のアプリケーション名が必要です。次のようにアプリケーション名の設定値を使用します。

  • このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。

  • KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報(チェックポイントやワーカーとシャードのマッピングなど)を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、「Amazon Kinesis Data Streams Applicationの状態の追跡」を参照してください。

認証情報の設定

デフォルトの認証情報プロバイダチェーンのいずれかの認証情報プロバイダで AWS の認証情報を使用できるようにする必要があります。たとえば、EC2 インスタンスでコンシューマーを実行している場合は、IAM ロールでインスタンスを起動することをお勧めします。この IAM ロールに関連付けられたアクセス許可を反映する AWS の認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーの認証情報を管理するための最も安全な方法です。

サンプルアプリケーションは、最初にインスタンスメタデータから IAM の認証情報を取得しようとします。

credentialsProvider = new InstanceProfileCredentialsProvider();

サンプルアプリケーションは、インスタンスメタデータから認証情報を取得できない場合、properties ファイルから認証情報を取得しようとします。

credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

インスタンスメタデータの詳細については、『Linux インスタンス用 Amazon EC2 ユーザーガイド』の「インスタンスメタデータ」を参照してください。

複数のインスタンスへのワーカー ID の使用

サンプルの初期化コードは、次のコードスニペットに示すように、ローカルコンピュータ名にグローバル一意識別子を追加して、ワーカーの ID(workerId)を作成します。このアプローチによって、1 台のコンピュータでコンシューマーアプリケーションの複数のインスタンスを実行するシナリオに対応できます。

String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

レコードプロセッサインターフェイスのバージョン 2 への移行

オリジナルインターフェースで使われるコードを移行するためには、上記のステップに加えて、次の手順が必要となります。

  1. レコードプロセッサのクラスを変更して、バージョン 2 レコードプロセッサインターフェイスにインポートします。

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  2. 入力するレファレンスを変更して、コンテナオブジェクトでメソッドの呼び出しを使います。 たとえば、shutdown() オペレーションで、checkpointershutdownInput.getCheckpointer() に変更します。

  3. レコードプロセッサのファクトリークラスを変更して、バージョン 2 レコードプロセッサファクトリーインターフェイスにインポートします。

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  4. ワーカーのコンストラクチャを変更して、Worker.Builder を使います。 (例:

    final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();