AWS SDK for Java を使用した共有スループットでのカスタムコンシューマーの開発 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS SDK for Java を使用した共有スループットでのカスタムコンシューマーの開発

全体で共有されたるカスタム Kinesis Data Streams コンシューマーを開発する方法の 1 つは、Amazon Kinesis Data Streams API を使用することです。このセクションでは、AWS SDK for Java での Kinesis Data Streams API の使用について説明します。このセクションで紹介する Java サンプルコードは、基本的な KDS API オペレーションを実行する方法を示しており、オペレーションタイプ別に論理的に分割されています。

これらのサンプルコードは、本稼働環境対応のコードではありません。考えられる例外のすべてを確認するものではなく、潜在的なセキュリティまたはパフォーマンス事項も考慮されていません。

また、他のプログラミング言語を使用して Kinesis Data Streams API を呼び出すこともできます。すべての利用可能な AWS SDK の詳細については、Amazon Web Services を使用した開発の開始を参照してください。

重要

全体で共有されるカスタム Kinesis Data Streams コンシューマーを開発するには、Kinesis Client Library (KCL) を使用することをお勧めします。KCL は、分散コンピューティングに関連する複雑なタスクの多くを処理することで、Kinesis Data Streams からデータを消費および処理するのに役立ちます。詳細については、KCL を使用したスループット共有カスタムコンシューマーの開発を参照してください。

ストリームからのデータの取得

Kinesis Data Streams API には、データストリームからレコードを取得するために呼び出すことができる getShardIterator および getRecords メソッドが含まれています。これはプルモデルで、コードはデータストリームのシャードからデータを直接取得します。

重要

KCL によって提供されているレコードプロセッサのサポートを使用して、データストリームからレコードを取得することをお勧めします。これは、データを処理するコードを組み込むプッシュモデルです。KCL は、ストリームからデータレコードを取り出し、アプリケーションコードに配信します。さらに、CL には、フェイルオーバー、リカバリ、負荷分散の機能が用意されています。詳細については、KCL を使用したスループット共有カスタムコンシューマーの開発を参照してください。

ただし、状況によっては Kinesis Data Streams API を使用した方がよい場合があります。例えば、データストリームのモニタリングやデバッグのためのカスタムツールを実装する場合です。

重要

Kinesis Data Streams は、データストリームのデータレコードの保持期間の変更をサポートしています。詳細については、データ保持期間の変更を参照してください。

シャードイテレーターの使用

レコードは、シャード単位でストリームから取得します。シャードごと、およびそのシャードから取得するレコードのバッチごとに、シャードイテレーターを取得する必要があります。シャードイテレーターは、レコードの取得元になるシャードを指定するために getRecordsRequest オブジェクトで使用します。シャードイテレーターに関連付けられるタイプによって、レコードを取得するシャード内の位置が決まります (詳細については、このセクションの後半を参照してください)。DescribeStream API-非推奨で説明したように、シャードイテレーターを使用する前にシャードを取得する必要が あります。

最初のシャードイテレーターは、getShardIterator メソッドを使用して取得します。追加のレコードバッチのシャードイテレーターは、getNextShardIterator メソッドによって返された getRecordsResult オブジェクトの getRecords メソッドを使用して取得します。シャードイテレーターの有効時間は 5 分間です。有効時間内にシャードイテレーターを使用すると、新しいシャードイテレーターが取得されます。各シャードイテレーターは、使用後も 5 分間有効です。

最初のシャードイテレーターを取得するには、GetShardIteratorRequest をインスタンス化してから、getShardIterator メソッドに渡します。リクエストを設定するには、ストリームとシャード ID を指定します。AWS アカウントのストリームを取得する方法については、ストリームのリストを参照してください。ストリーム内のシャードを取得する方法については、DescribeStream API-非推奨を参照してください。

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

このサンプルコードは、最初のシャードイテレーターを取得するときのイテレータータイプとして TRIM_HORIZON を指定しています。このイテレーター型を指定することで、レコードはまず、シャードに直近に追加されたレコード (tip) からではなく、シャードに最初に追加されたレコードから返されます。以下は、使用可能なイテレータータイプです。

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

詳細については、ShardIteratorTypeを参照してください。

イテレータータイプによっては、タイプに加えてシーケンス番号を指定する必要があります。以下がその例です。

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

getRecords を使用してレコードを取得したら、レコードの getSequenceNumber メソッドを呼び出すことによって、レコードのシーケンス番号を取得できます。

record.getSequenceNumber()

さらに、データストリームにレコードを追加するコードは、getSequenceNumber の結果に対して putRecord を呼び出すことで、追加されたレコードのシーケンス番号を取得できます。

lastSequenceNumber = putRecordResult.getSequenceNumber();

シーケンス番号は、レコードの順序が厳密に増加することを保証するために使用できます。詳細については、PutRecord の例のサンプルコードを参照してください。

GetRecords の使用

シャードイテレーターを取得したら、GetRecordsRequest オブジェクトをインスタンス化します。リクエストのイテレーターは、setShardIterator メソッドを使用して指定します。

オプションとして、setLimit メソッドを使用することで、取得するレコードの数を設定することもできます。getRecords が返すレコードの数は、常にこの制限以下になります。この制限を指定しない場合、getRecords は取得したレコードの 10 MB を返します。次のサンプルコードは、この制限を 25 レコードに設定します。

レコードが返されない場合、シャードイテレーターが参照するシーケンス番号には、このシャードから現在利用できるデータレコードが存在しないことを意味します。この状況では、アプリケーションが、ストリームのデータソースに対して適切な時間を待機する必要があります。その後、getRecords への以前のコールで返されたシャードイテレーターを使用して、シャードからのデータの取得を再試行します。

getRecordsRequest メソッドに getRecords を渡し、返された値を getRecordsResult オブジェクトとしてキャプチャします。データレコードを取得するには、getRecords オブジェクトに対して getRecordsResult メソッドを呼び出します。

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

getRecords への別のコールを準備するために、getRecordsResult から次のシャードイテレーターを取得します。

shardIterator = getRecordsResult.getNextShardIterator();

最良の結果を得るには、getRecords へのコール間で少なくとも 1 秒間 (1,000 ミリ秒) スリープして、getRecords の頻度制限を超えないようにしてください。

try { Thread.sleep(1000); } catch (InterruptedException e) {}

一般的に、テストシナリオで単一のレコードを取得している場合でも、getRecords はループで呼び出す必要があります。getRecords への単一のコールは、後続のシーケンス番号ではシャード内に複数のレコードがあるという場合でも、空のレコードリストを返す可能性があります。この状況が発生すると、空のレコードリストとともに返された NextShardIterator が、シャード内の後続のシーケンス番号を参照し、最終的には連続する getRecords コールがレコードを返します。次のサンプルは、ループの使用を表すものです。

例: getRecords

以下のコード例には、このセクションで説明した getRecords のヒント (ループでの呼び出しなど) が反映されています。

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Kinesis Client Library を使用している場合は、データを返す前に複数回呼び出しが行われる場合があります。この動作は仕様であり、KCL やデータの問題を示すものではありません。

リシャーディングへの適応

getRecordsResult.getNextShardIteratornull を返す場合、このシャードに関係するシャード分割またはマージが発生したことを示します。このシャードは現在 CLOSED 状態であり、このシャードから使用可能なすべてのデータレコードを読み込んでいます。

このシナリオでは、getRecordsResult.childShards を使用して、分割またはマージによって作成された、処理中のシャードの新しい子シャードについて学習することができます。詳細については、ChildShardを参照してください。

分割の場合は、2 つの新しいシャードの両方に、以前処理していたシャードのシャード ID に等しい parentShardId があります。adjacentParentShardId の値は、これらのシャード両方で null になります。

マージの場合は、マージによって作成された単一の新しいシャードに、一方の親シャードの ID に等しい parentShardId と、もう一方の親シャードのシャード ID に等しい adjacentParentShardId があります。アプリケーションはこれらのいずれかのシャードからすべてのデータを読み取り済みです。これは getRecordsResult.getNextShardIterator から null が返されたシャードです。アプリケーションでデータの順序が重要である場合、結合によって作成された子シャードから新しいデータを読み取る前に、その他の親シャードからもすべてのデータを読み取るようにする必要があります。

複数のプロセッサを使用してストリームからデータを取得し (たとえば、シャードごとに 1 つのプロセッサ)、シャードの分割または結合を行う場合、プロセッサの数を増減して、シャードの数の変化に適応させます。

シャードの状態 (CLOSED など) の説明を含むリシャーディングの詳細については、ストリームをリシャーディングするを参照してください。

AWS Glue スキーマレジストリを使用してデータと相互作用する

Kinesis Data Streams を、AWS Glue スキーマレジストリと統合することができます。AWS Glue スキーマレジストリを使用すると、スキーマを一元的に検出、制御、および進化させながら、生成されたデータが登録されたスキーマによって継続的に検証されるようにできます。スキーマは、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。AWS Glue スキーマレジストリを使用すると、ストリーミングアプリケーション内のエンドツーエンドのデータ品質とデータガバナンスを改善できます。詳細については、AWS Glue スキーマレジストリを参照してください。この統合を設定する方法の 1 つは、AWS Java SDK で利用可能な GetRecords Kinesis Data Streams API を使用することです。

GetRecords Kinesis Data Streams API を使用して Kinesis Data Streams とスキーマレジストリの統合を設定する方法の詳細については、ユースケース: Amazon Kinesis Data Streams と AWS Glue スキーマレジストリの統合のKinesis Data Streams API を使用したデータの操作セクションを参照してください。