Amazon Kinesis Data Streams
開発者ガイド

Kinesis Data Streams API および AWS SDK for Java を使用したコンシューマーの開発

Amazon Kinesis Data Streams API と AWS SDK for Java を使用してコンシューマーを開発できます。Kinesis Data Streams を初めて利用する場合は、Amazon Kinesis Data Streams とは?Amazon Kinesis Data Streams の使用開始 に説明されている概念と用語について理解することから始めてください。

以下の例では、Kinesis Data Streams API について説明し、AWS SDK for Java を使用してストリームからデータを取得します。ただし、ほとんどのユースケースでは、Kinesis Client Library (KCL) ライブラリを使用します。詳細については、「Kinesis Client Library 1.x を使用したコンシューマーの開発」を参照してください。

このセクションで紹介する Java サンプルコードは、基本的な Kinesis Data Streams API オペレーションを実行する方法を示しており、オペレーションタイプ別に論理的に分割されています。この例に示すコードは、本稼働環境に使用できるコードではありません。考えられる例外は確認されておらず、想定されるセキュリティやパフォーマンスも考慮されていません。また、他のプログラミング言語を使用して Kinesis Data Streams API を呼び出すこともできます。利用可能なすべての AWS SDK の詳細については、「アマゾン ウェブ サービスを使用した開発の開始」を参照してください。

各タスクには前提条件があります。たとえば、ストリームを作成するまではストリームにデータを追加できず、ストリームを作成するにはクライアントを作成する必要があります。詳細については、「ストリームの作成と管理」を参照してください。

ストリームからのデータの取得

Kinesis Data Streams API には、ストリームからデータを取得する getShardIterator メソッドと getRecords メソッドが用意されています。これはプルモデルで、コードはストリームのシャードからデータを直接取得します。

Kinesis Client Library(KCL)で提供されているレコードプロセッサのサポートを使用して、コンシューマーアプリケーションのストリームデータを取得することをお勧めします。これは、データを処理するコードを組み込むプッシュ モデルです。KCL は、ストリームからデータレコードを取り出し、アプリケーションコードに配信します。さらに、KCL には、フェイルオーバー、リカバリ、負荷分散の機能が用意されています。詳細については、「Kinesis Client Library 1.x を使用したコンシューマーの開発」を参照してください。

ただし、状況によっては AWS SDK for Java とともに Kinesis Data Streams API を使用した方がよい場合があります。たとえば、ストリームのモニタリングやデバッグのためのカスタムツールを実装する場合です。

重要

Kinesis Data Streams は、データストリームのデータレコードの保持期間の変更をサポートしています。詳細については、「データ保持期間の変更」を参照してください。

シャードイテレーターを使用する

ストリームからシャード単位でレコードを取得します。シャードごとに、そのシャードから取得するレコードのバッチごとに、シャードイテレーターを取得する必要があります。シャードイテレーターを getRecordsRequest オブジェクトで使用して、レコードの取得元になるシャードを指定します。シャードイテレーターに関連付ける型により、シャード内でレコードの取得元になる位置が決まります (詳細についてはこのセクションの後半を参照)。「ストリームからシャードを取得する」で説明したように、シャードイテレーターを使用する前にシャードを取得する必要が あります。

最初のシャードイテレーターは、getShardIterator メソッドを使用して取得します。レコードのその他のバッチのシャードイテレーターは、getRecords メソッドによって返された getRecordsResult オブジェクトの getNextShardIterator メソッドを使用して取得します。シャードイテレーターは 5 分間有効です。有効な間にシャードイテレーターを使用すると、新しいシャードイテレーターを取得します。使用された後でも、各シャードイテレーターは 5 分間有効です。

最初のシャードイテレーターを取得するには、GetShardIteratorRequest をインスタンス化し、getShardIterator メソッドに渡します。リクエストを設定するには、ストリームとシャード ID を指定する必要があります。AWS アカウントのストリームを取得する方法については、「ストリームのリスト」を参照してください。ストリーム内のシャードを取得する方法については、「ストリームからシャードを取得する」を参照してください。

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

このサンプルコードでは、最初のシャードイテレーターを取得するときにイテレーター型として TRIM_HORIZON を指定しています。このイテレーター型を指定することで、レコードはまず、シャードに直近に追加されたレコード (tip) からではなく、シャードに最初に追加されたレコードから返されます。イテレーターの種類は次のとおりです。

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

詳細については、「ShardIteratorType」を参照してください。

イテレーター型によっては、型に加えてシーケンス番号を指定する必要があります。以下に例を示します。

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

getRecords を使用してレコードを取得したら、レコードの getSequenceNumber メソッドを呼び出して、レコードのシーケンス番号を取得できます。

record.getSequenceNumber()

さらに、データストリームにレコードを追加するコードでは、putRecord の結果に対して getSequenceNumber を呼び出すことで、追加したレコードのシーケンス番号を取得できます。

lastSequenceNumber = putRecordResult.getSequenceNumber();

シーケンス番号を使用すると、レコードの順番が厳密に増えるようにできます。詳細については、「PutRecord の例」のサンプルコードを参照してください。

GetRecords を使用する

シャードイテレーターを取得したら、GetRecordsRequest オブジェクトをインスタンス化します。setShardIterator メソッドを使用してリクエストのイテレーターを指定します。

必要に応じて、setLimit メソッドを使用して、取得するレコードの数を設定することもできます。getRecords によって返されるレコードの数は常にこの制限以下になります。この制限を指定しない場合、getRecords は取得したレコードの 10 MB を返します。次のサンプルコードでは、この制限を 25 個のレコードに設定しています。

レコードが返されない場合、シャードイテレーターによって参照されたシーケンス番号では、このシャードからどのデータレコードも現在使用できないことになります。この状況では、ストリームのデータソースに対して、アプリケーションを適切な時間 (1 秒以上) 待機状態にする必要があります。次に、getRecords の前の呼び出しで返されたシャードイテレーターを使用して、シャードからのデータの取得を再試行します。レコードがストリームに追加されてから getRecords で使用できるまでに約 3 秒のレイテンシーが発生します。

getRecords メソッドに getRecordsRequest を渡し、getRecordsResult オブジェクトとして返された値をキャプチャします。データレコードを取得するには、getRecordsResult オブジェクトの getRecords メソッドを呼び出します。

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

getRecords の別の呼び出しに備えて、getRecordsResult から次のシャードイテレーターを取得します。

shardIterator = getRecordsResult.getNextShardIterator();

最良の結果を得るために、getRecords の呼び出し間の 1 秒 (1,000 ミリ秒) 以上はスリープ状態にし、getRecords の頻度制限を超えないようにしてください。

try { Thread.sleep(1000); } catch (InterruptedException e) {}

一般的に、テストシナリオで 1 つのレコードを取得するときでも、getRecords はループ内で呼び出す必要があります。getRecords の 1 回の呼び出しでは、後続のシーケンス番号でシャード内にレコードがある場合でも、空のレコードのリストが返されることがあります。この状況になった場合は、空のレコードのリストと共に返された NextShardIterator によってシャード内の後続のシーケンス番号が参照されて、続く getRecords の呼び出しによって最終的にレコードが返されます。次のサンプルでは、ループの使用を示しています。

例: getRecords

以下のコード例には、このセクションで示した getRecords のヒント (ループ内での呼び出しなど) を反映しています。

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Kinesis Client Library を使用している場合は、データを返す前に複数回呼び出しが行われる場合があります。この動作は仕様であり、KCL やデータの問題を示すものではありません。

リシャーディングに適応する

getRecordsResult.getNextShardIterator によって null が返された場合、A シャードは分割または結合され、現在 CLOSED 状態であり、使用可能なすべてのデータレコードはこのシャードから読み取り済みであることを表します。

このシナリオでは、ストリーム内のシャードを再び列挙して、分割または結合によって作成された新しいシャードを取得する必要があります。

分割の場合、2 つの新しいシャードの parentShardId はいずれも、前に処理されたシャードの ID に一致します。これらのシャードの adjacentParentShardId の値はいずれも null です。

結合の場合、結合によって作成された 1 つの新しいシャードの parentShardId は、親のシャードのいずれかの ID に一致し、adjacentParentShardId は、その他の親シャードの ID に一致します。アプリケーションはこれらのいずれかのシャードからすべてのデータを読み取り済みです。これは、getRecordsResult.getNextShardIteratornull を返したシャードです。アプリケーションでデータの順序が重要である場合、結合によって作成された子シャードから新しいデータを読み取る前に、その他の親シャードからもすべてのデータを読み取るようにする必要があります。

複数のプロセッサを使用してストリームからデータを取得し (たとえば、シャードごとに 1 つのプロセッサ)、シャードの分割または結合を行う場合、プロセッサの数を増減して、シャードの数の変化に適応させます。

シャードの状態(CLOSED など)の説明を含むリシャーディングの詳細については、「ストリームをリシャーディングする」を参照してください。