AWS SDK for Java を使用したスループット共有カスタムコンシューマーの開発 - Amazon Kinesis Data Streams

「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」

AWS SDK for Java を使用したスループット共有カスタムコンシューマーの開発

カスタム開発の手法の1つ Kinesis Data Streams 消費者は、 Amazon Kinesis Data Streams API。このセクションでは、 Kinesis Data Streams AWS SDK for Javaを使用したAPI。このセクションのJavaサンプルコードは、基本的なKDS API操作の実行方法を示しており、操作タイプ別に論理的に分割されています。

この例に示すコードは、本稼働環境に使用できるコードではありません。考えられる例外は確認されておらず、想定されるセキュリティやパフォーマンスも考慮されていません。

には、 Kinesis Data Streams 他の異なるプログラミング言語を使用するAPI。すべての利用可能な AWS SDK の詳細については、「アマゾン ウェブ サービスを使用した開発の開始」を参照してください。

重要

カスタム開発の推奨方法 Kinesis Data Streams Kinesisクライアントライブラリ(KCL)を使用することです。KCLは、分散コンピューティングに関連する多くの複雑なタスクに対処することで、Kinesisデータストリームからデータを消費および処理するのに役立ちます。詳細については、以下を参照してください。 KCLを使用した共有スループットによるカスタム・コンシューマの開発.

ストリームからのデータの取得

は、 Kinesis Data Streams APIには、 getShardIterator および getRecords データ ストリームからレコードを取得するために呼び出すことができるメソッド。これはプルモデルで、コードがデータストリームのシャードから直接データレコードを描画します。

重要

のレコードプロセッササポートを使用することをお勧めします。 KCL データストリームからレコードを取得できます。これはプッシュモデルで、データを処理するコードを実装します。は、 KCL は、データ ストリームからデータ レコードを取得し、アプリケーション コードに配信します。さらに、KCL には、フェイルオーバー、リカバリ、負荷分散の機能が用意されています。詳細については、以下を参照してください。 KCLを使用した共有スループットによるカスタム・コンシューマの開発.

ただし、 Kinesis Data Streams API。たとえば、データ・ストリームを監視またはデバッグするためのカスタム・ツールを実装します。

重要

Kinesis Data Streams は、データストリームのデータレコードの保持期間の変更をサポートしています。詳細については、「データ保持期間の変更」を参照してください。

シャードイテレーターを使用する

ストリームからシャード単位でレコードを取得します。シャードごとに、そのシャードから取得するレコードのバッチごとに、シャードイテレーターを取得する必要があります。シャードイテレーターを getRecordsRequest オブジェクトで使用して、レコードの取得元になるシャードを指定します。シャードイテレーターに関連付ける型により、シャード内でレコードの取得元になる位置が決まります (詳細についてはこのセクションの後半を参照)。「DescribeStreamAPI - 廃止」で説明したように、シャードイテレーターを使用する前にシャードを取得する必要が あります。

最初のシャードイテレーターは、getShardIterator メソッドを使用して取得します。レコードのその他のバッチのシャードイテレーターは、getNextShardIterator メソッドによって返された getRecordsResult オブジェクトの getRecords メソッドを使用して取得します。シャードイテレーターは 5 分間有効です。有効な間にシャードイテレーターを使用すると、新しいシャードイテレーターを取得します。使用された後でも、各シャードイテレーターは 5 分間有効です。

最初のシャードイテレーターを取得するには、GetShardIteratorRequest をインスタンス化し、getShardIterator メソッドに渡します。リクエストを設定するには、ストリームとシャード ID を指定する必要があります。AWS アカウントのストリームを取得する方法については、「ストリームのリスト」を参照してください。ストリーム内のシャードを取得する方法については、「DescribeStreamAPI - 廃止」を参照してください。

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

このサンプルコードでは、最初のシャードイテレーターを取得するときにイテレーター型として TRIM_HORIZON を指定しています。このイテレーター型を指定することで、レコードはまず、シャードに直近に追加されたレコード (tip) からではなく、シャードに最初に追加されたレコードから返されます。イテレーターの種類は次のとおりです。

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

詳細については、「ShardIteratorType」を参照してください。

イテレーター型によっては、型に加えてシーケンス番号を指定する必要があります。以下に例を示します。

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

getRecords を使用してレコードを取得したら、レコードの getSequenceNumber メソッドを呼び出して、レコードのシーケンス番号を取得できます。

record.getSequenceNumber()

さらに、データストリームにレコードを追加するコードでは、getSequenceNumber の結果に対して putRecord を呼び出すことで、追加したレコードのシーケンス番号を取得できます。

lastSequenceNumber = putRecordResult.getSequenceNumber();

シーケンス番号を使用すると、レコードの順番が厳密に増えるようにできます。詳細については、「PutRecord の例」のサンプルコードを参照してください。

GetRecords を使用する

シャードイテレーターを取得したら、GetRecordsRequest オブジェクトをインスタンス化します。setShardIterator メソッドを使用してリクエストのイテレーターを指定します。

必要に応じて、setLimit メソッドを使用して、取得するレコードの数を設定することもできます。getRecords によって返されるレコードの数は常にこの制限以下になります。この制限を指定しない場合、getRecords は取得したレコードの 10 MB を返します。次のサンプルコードでは、この制限を 25 個のレコードに設定しています。

レコードが返されない場合、シャードイテレーターによって参照されたシーケンス番号では、このシャードからどのデータレコードも現在使用できないことになります。この状況では、ストリームのデータソースに対して、アプリケーションを適切な時間 (1 秒以上) 待機状態にする必要があります。次に、前の呼び出しで返されたシャードイテレータを使用して、シャードからデータを再度取得します。 getRecords。 レコードがストリームに追加された時点から、そのレコードが getRecords.

getRecordsRequest メソッドに getRecords を渡し、getRecordsResult オブジェクトとして返された値をキャプチャします。データレコードを取得するには、getRecords オブジェクトの getRecordsResult メソッドを呼び出します。

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

getRecords の別の呼び出しに備えて、getRecordsResult から次のシャードイテレーターを取得します。

shardIterator = getRecordsResult.getNextShardIterator();

最良の結果を得るために、getRecords の呼び出し間の 1 秒 (1,000 ミリ秒) 以上はスリープ状態にし、getRecords の頻度制限を超えないようにしてください。

try { Thread.sleep(1000); } catch (InterruptedException e) {}

一般的に、テストシナリオで 1 つのレコードを取得するときでも、getRecords はループ内で呼び出す必要があります。getRecords の 1 回の呼び出しでは、後続のシーケンス番号でシャード内にレコードがある場合でも、空のレコードのリストが返されることがあります。この状況になった場合は、空のレコードのリストと共に返された NextShardIterator によってシャード内の後続のシーケンス番号が参照されて、続く getRecords の呼び出しによって最終的にレコードが返されます。次のサンプルでは、ループの使用を示しています。

例: getRecords

以下のコード例には、このセクションで示した getRecords のヒント (ループ内での呼び出しなど) を反映しています。

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Kinesis Client Library を使用している場合は、データを返す前に複数回呼び出しが行われる場合があります。この動作は仕様であり、KCL やデータの問題を示すものではありません。

リシャーディングに適応する

次の場合: getRecordsResult.getNextShardIterator 返品 null、このシャードに関連するシャード分割またはマージが発生したことを示します。このシャードは現在 CLOSED このシャードから利用可能なデータ記録をすべて読み取った状態。

このシナリオでは、 getRecordsResult.childShards 分割またはマージによって作成された、処理中のシャードの新しい子シャードについて学習します。詳細については、以下を参照してください。 子シャード.

分割の場合、2 つの新しいシャードの parentShardId はいずれも、前に処理されたシャードの ID に一致します。これらのシャードの adjacentParentShardId の値はいずれも null です。

結合の場合、結合によって作成された 1 つの新しいシャードの parentShardId は、親のシャードのいずれかの ID に一致し、adjacentParentShardId は、その他の親シャードの ID に一致します。アプリケーションはこれらのいずれかのシャードからすべてのデータを読み取り済みです。これは、 getRecordsResult.getNextShardIterator 返却済み null。 データの順序がアプリケーションにとって重要である場合は、マージによって作成された子シャードから新しいデータを読み出す前に、他の親シャードからすべてのデータも読み取っていることを確認してください。

複数のプロセッサを使用してストリームからデータを取得し (たとえば、シャードごとに 1 つのプロセッサ)、シャードの分割または結合を行う場合、プロセッサの数を増減して、シャードの数の変化に適応させます。

シャードの状態 (CLOSED など) の説明を含むリシャーディングの詳細については、「ストリームをリシャーディングする」を参照してください。