Java での Kinesis クライアントライブラリコンシューマーの開発 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Java での Kinesis クライアントライブラリコンシューマーの開発

Kinesis Client Library (KCL) を使用して、Kinesis Data Streams からデータを処理するアプリケーションを構築できます。Kinesis クライアントライブラリは、複数の言語で使用できます。このトピックでは、Java について説明します。Javadoc リファレンスを表示するには、「」を参照してください。AWSクラスの Javadoc トピックAmazonKinesisクライアント

Java KCL をからダウンロードするにはGitHubの参照先情報Kinesis Client Library (Java)。Apache Maven で Java KCL を検索するには、KCL 検索結果ページで. Java KCL コンシューマアプリケーションのサンプルコードをからダウンロードするにはGitHubを参照のことKCL for Java サンプルプロジェクトページでGitHub。

このサンプルアプリケーションは Apache Commons Logging を使用します。ログ設定は、configure ファイルで定義されている静的な AmazonKinesisApplicationSample.java メソッドを使用して変更できます。Log4j での Apache Commons Logging の使用方法の詳細については、「」を参照してください。AWSJava アプリケーション、を参照してくださいLog4j でログAWS SDK for Javaデベロッパーガイド

Java で KCL コンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

I を実装するRecordProcessorメソッド

KCL は現在、2 つのバージョンのIRecordProcessorインターフェイス:元のインターフェイスは KCL の最初のバージョンで使用でき、バージョン 2 は KCL バージョン 1.5.0 から利用可能です。両方のインターフェイスが完全にサポートされています。選択するインターフェイスは、お使いのシナリオの要件によって異なります。相違点をすべて確認するには、ローカルに作成した Javadocs、またはソースコードを参照してください。以下のセクションでは、使い始めの最小限の実装を概説します。

オリジナルインターフェイス (バージョン 1)

オリジナルな IRecordProcessor interface (package com.amazonaws.services.kinesis.clientlibrary.interfaces) は、コンシューマーが実装しているべき次のレコードプロセッサメソッドを公開します。このサンプルでは、開始点として使用できる実装を提供しています (AmazonKinesisApplicationSampleRecordProcessor.java を参照してください)。

public void initialize(String shardId) public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
initialize

KCLは、initializeメソッドレコードプロセッサがインスタンス化されると、メソッドで、特定のシャード ID をパラメータとして渡します。このレコードプロセッサはこのシャードのみを処理し、通常、その逆も真です (このシャードはこのレコード プロセッサによってのみ処理されます)。ただし、コンシューマーでは、データレコードが複数回処理される可能性に対応する必要があります。Kinesis Data Streams には少なくとも 1 回はセマンティクスとは、シャードから取得されたすべてのデータレコードが、コンシューマーのワーカーによって少なくとも 1 回処理されることを意味します。特定のシャードが複数のワーカーによって処理される可能性がある場合の詳細については、「リシャーディング、拡張、並列処理」を参照してください。

public void initialize(String shardId)
processRecords

KCLは、processRecordsメソッド。で指定されたシャードのデータレコードのリストを渡します。initialize(shardId)方法。レコードプロセッサは、コンシューマーのセマンティクスに従って、これらのレコードのデータを処理します。たとえば、ワーカーは、データ変換を実行した後、Amazon Simple Storage Service (Amazon S3) バケットに結果を格納できます。

public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)

データ自体に加えて、レコードにもシーケンス番号とパーティションキーが含まれます。ワーカーはデータを処理するときに、これらの値を使用できます。たとえば、ワーカーは、パーティションのキーの値に基づいて、データを格納する S3 バケットを選択できます。Record クラスは、レコードのデータ、シーケンス番号、およびパーティションキーへのアクセスを提供する次のメソッドを公開します。

record.getData() record.getSequenceNumber() record.getPartitionKey()

サンプルでは、プライベートメソッド processRecordsWithRetries に、ワーカーでレコードのデータ、シーケンス番号、およびパーティションキーにアクセスする方法を示すコードが含まれています。

Kinesis Data Streams では、シャードで既に処理されたレコードを追跡するためにレコードプロセッサが必要です。KCL は、チェックポインタ () を渡すことで、この追跡をユーザーに代わり処理します。IRecordProcessorCheckpointer) にprocessRecords。レコードプロセッサは、checkpointこのインターフェイスでメソッドを使用すると、シャード内のレコードの処理の進行状況をKCL に知らせます。ワーカーでエラーが発生すると、KCL はこの情報を使用して、処理されたことが分かっている最後のレコードからシャードの処理を再開します。

分割または結合オペレーションの場合、KCL は、元のシャードのプロセッサがcheckpoint元のシャードのすべての処理が完了したことを通知します。

パラメータを渡さない場合、KCL は、checkpointは、レコードプロセッサに最後のレコードを渡した時点までのすべてのレコードが処理済みであることを意味します。したがって、レコードプロセッサは、渡されたリストにあるすべてのレコードの処理が完了した場合にのみ、checkpoint を呼び出す必要があります。レコードプロセッサは、checkpoint の各呼び出しで processRecords を呼び出す必要はありません。たとえば、プロセッサは、checkpoint を 3 回呼び出すたびに、processRecords を呼び出すことができます。オプションでレコードの正確なシーケンス番号をパラメータとして checkpoint に指定できます。この場合、KCL は、すべてのレコードがそのレコードまで処理されたと見なします。

このサンプルでは、プライベートメソッド checkpoint で、適切な例外処理と再試行のロジックを使用する IRecordProcessorCheckpointer.checkpoint を呼び出す方法を示しています。

KCLは依存しているprocessRecordsを使用して、データレコードの処理で発生するすべての例外を処理します。から例外がスローされた場合processRecordsの場合、KCL は、例外発生前に渡されたデータレコードをスキップします。つまり、これらのレコードは、例外をスローしたレコードプロセッサ、またはコンシューマーの他のレコードプロセッサに再送信されません。

shutdown

KCLは、shutdown処理が終了したときの方法(シャットダウンの理由はTERMINATE) またはワーカーが応答していない場合 (シャットダウンの理由はZOMBIE).

public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

シャードが分割または結合されたか、ストリームが削除されたため、レコードプロセッサがシャードからこれ以上レコードを受信しない場合は、処理が終了します。

KCLも渡すIRecordProcessorCheckpointerインターフェイスからshutdown。シャットダウンの理由が TERMINATE である場合、レコードプロセッサはすべてのデータレコードの処理を終了し、このインターフェイスの checkpoint メソッドを呼び出します。

更新されたインターフェイス (バージョン 2)

更新された IRecordProcessor interface (package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2) は、コンシューマーが実装しているべき次のレコードプロセッサメソッドを公開します。

void initialize(InitializationInput initializationInput) void processRecords(ProcessRecordsInput processRecordsInput) void shutdown(ShutdownInput shutdownInput)

コンテナオブジェクトのメソッドの呼び出しで、インターフェイスのオリジナルバージョンのすべての引数にアクセスできます。たとえば、processRecords() でレコードのリストを取得には、processRecordsInput.getRecords() が使用できます。

このインターフェイスのバージョン 2 (KCL 1.5.0 以降) 0では、オリジナルインターフェイスで提供される入力に加えて次の新しい入力が使用できます。

シーケンス番号の開始

InitializationInput オペレーションへ渡される initialize() オブジェクトでは、開始シーケンス番号はレコードプロセッサのインスタンスに配信されるレコードです。このシーケンス番号は、同じシャードで処理されたレコードプロセッサインスタンスの最後のチェックポイントです。これは、アプリケーションでこの情報が必要になる場合のために提供されます。

保留チェックポイントシーケンス番号

initialize() オペレーションへ渡される InitializationInput オブジェクトの保留チェックポイントシーケンス番号 (ある場合) とは、前のレコードプロセッサインスタンスが停止する前にコミットできなかったものを示します。

I のクラスファクトリを実装するRecordProcessorインターフェイス

レコードプロセッサのメソッドを実装するクラスのファクトリも実装する必要があります。コンシューマーは、ワーカーをインスタンス化するときに、このファクトリへの参照を渡します。

サンプルでは、オリジナルのレコードプロセッサインターフェースを使用した、AmazonKinesisApplicationSampleRecordProcessorFactory.java ファイルのファクトリクラスを実装します。クラスファクトリでバージョン 2 レコードプロセッサを作成する場合には、com.amazonaws.services.kinesis.clientlibrary.interfaces.v2 とい名のパッケージを使用してください。

public class SampleRecordProcessorFactory implements IRecordProcessorFactory { /** * Constructor. */ public SampleRecordProcessorFactory() { super(); } /** * {@inheritDoc} */ @Override public IRecordProcessor createProcessor() { return new SampleRecordProcessor(); } }

ワーカーの作成

で説明したようにI を実装するRecordProcessorメソッドには、KCL レコードプロセッサインターフェイスの 2 つのバージョンから選択でき、ワーカーの作成方法に影響します。オリジナルレコードプロセッサインターフェイスは、次のコードストラクチャを使用してワーカーを作成します。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker(recordProcessorFactory, config);

レコード プロセッサインターフェイスのバージョン 2 では、Worker.Builder を使用してワーカを作成でき、どのコンストラクタを使うかや引数の順序を考慮する必要はありません。更新されたレコードプロセッサインターフェイスは、次のコードストラクチャを使用してワーカーを作成します。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();

設定プロパティを変更する

このサンプルでは、設定プロパティのデフォルト値を提供します。ワーカーのこの設定データは KinesisClientLibConfiguration オブジェクトにまとめられています。ワーカーをインスタンス化する呼び出しで、このオブジェクトと IRecordProcessor のクラスファクトリへの参照が渡されます。Java の properties ファイルを使用してこれらのプロパティを独自の値にオーバーライドできます (AmazonKinesisApplicationSample.java を参照してください)。

アプリケーション名

KCL には、ユーザーのアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーション名が必要です。次のようにアプリケーション名の設定値を使用します。

  • このアプリケーション名と関連付けられたすべてのワーカーは、連係して同じストリームを処理していると見なされます。これらのワーカーは複数のインスタンスに分散している場合もあります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。

  • KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーシャードマッピングなど) を維持します。各アプリケーションには、それぞれの DynamoDB テーブルがあります。詳細については、「リーステーブルを使用して KCL コンシューマアプリケーションによって処理されたシャードを追跡する」を参照してください。

認証情報を設定する

あなたは自分のものを作らなければなりませんAWSデフォルトの認証情報プロバイダーチェーンのいずれかの認証情報プロバイダーに対して、使用可能な認証情報。たとえば、EC2 インスタンスでコンシューマーを実行している場合は、IAM ロールでインスタンスを起動することをお勧めします。AWSこの IAM ロールに関連付けられたアクセス許可を反映する認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーの認証情報を管理するための最も安全な方法です。

サンプルアプリケーションは、最初にインスタンスメタデータから IAM 認証情報を取得しようとします。

credentialsProvider = new InstanceProfileCredentialsProvider();

サンプルアプリケーションは、インスタンスメタデータから認証情報を取得できない場合、properties ファイルから認証情報を取得しようとします。

credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

インスタンスのメタデータの詳細については、「」を参照してください。インスタンスメタデータLinux インスタンス用 Amazon EC2 ユーザーガイド

複数のインスタンスへのワーカー ID の使用

サンプルの初期化コードは、次のコードスニペットに示すように、ローカルコンピュータ名にグローバル一意識別子を追加して、ワーカーの ID (workerId) を作成します。このアプローチによって、1 台のコンピュータでコンシューマーアプリケーションの複数のインスタンスを実行するシナリオに対応できます。

String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

レコードプロセッサインターフェイスのバージョン 2 への移行

オリジナルインターフェースで使われるコードを移行するためには、上記のステップに加えて、次の手順が必要となります。

  1. レコードプロセッサのクラスを変更して、バージョン 2 レコードプロセッサインターフェイスにインポートします。

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  2. コンテナオブジェクトで get メソッドを使用するには、入力するリファレンスを変更します。たとえば、shutdown() オペレーションで、checkpointershutdownInput.getCheckpointer() に変更します。

  3. レコードプロセッサのファクトリークラスを変更して、バージョン 2 レコードプロセッサファクトリーインターフェイスにインポートします。

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  4. ワーカーのコンストラクチャを変更して、Worker.Builder を使います。例:

    final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();