Amazon Kinesis Data Streams API と AWS SDK for Java を使用したプロデューサーの開発 - Amazon Kinesis Data Streams

「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」

Amazon Kinesis Data Streams API と AWS SDK for Java を使用したプロデューサーの開発

Amazon Kinesis Data Streams API と AWS SDK for Java を使用してプロデューサーを開発できます。Kinesis Data Streams を初めて利用する場合は、「Amazon Kinesis Data Streams とは」と「Amazon Kinesis Data Streams のご利用開始にあたって」に説明されている概念と用語について理解することから始めてください。

以下の例では、Kinesis Data Streams API について説明し、AWS SDK for Java を使用してストリームにデータを追加 (入力) します。ただし、ほとんどのユースケースでは、Kinesis Data Streams KPL ライブラリを使用します。詳細については、「Amazon Kinesis Producer Library を使用したプロデューサーの開発」を参照してください。

この章で紹介する Java サンプルコードは、基本的な Kinesis Data Streams API オペレーションを実行する方法を示しており、オペレーションタイプ別に論理的に分割されています。これらのサンプルは、すべての例外を確認しているわけではなく、すべてのセキュリティやパフォーマンスの側面を考慮しているわけでもない点で、本稼働環境に使用できるコードを表すものではありません。また、他のプログラミング言語を使用して Kinesis Data Streams​ API​ を呼び出すこともできます。利用可能なすべての AWS SDKs の詳細については、「アマゾン ウェブ サービスを使用した開発の開始」を参照してください。

各タスクには前提条件があります。たとえば、ストリームを作成するまではストリームにデータを追加できず、ストリームを作成するにはクライアントを作成する必要があります。詳細については、「ストリームの作成と管理」を参照してください。

ストリームへのデータの追加

ストリームを作成したら、レコードの形式でストリームにデータを追加できます。レコードはデータ BLOB の形式で処理するデータを格納するデータ構造です。データをレコードに格納した後、Kinesis Data Streams ではいずれの方法でもデータが検査、解釈、または変更されることはありません。各レコードにはシーケンス番号とパーティションキーも関連付けられます。

API には、ストリームにデータを追加するオペレーションとして、Kinesis Data StreamsPutRecordsPutRecord の 2 つの異なるオペレーションがあります。 オペレーションは HTTP リクエストごとにストリームに複数のレコードを送信し、単数形の PutRecords オペレーションは一度に 1 つずつストリームにレコードを送信します (各レコードに個別の HTTP リクエストが必要です)。PutRecordデータプロデューサーあたりのスループットが向上するため、ほとんどのアプリケーションでは PutRecords を使用してください。これらの各オペレーションの詳細については、後のそれぞれのサブセクションを参照してください。

ソースアプリケーションは Kinesis Data Streams API を使用してストリームにデータを追加するため、1 つ以上のコンシューマーアプリケーションが同時にストリームからデータを取得して処理する可能性があることを常に念頭に置いてください。コンシューマーが Kinesis Data Streams API を使用してデータを取得する方法の詳細については、「ストリームからのデータの取得」を参照してください。

を使用した複数のレコードの追加PutRecords

PutRecords オペレーションは、1 つのリクエストで Kinesis Data Streams に複数のレコードを送信します。を使用することによって、プロデューサーは PutRecords にデータを送信するときに高スループットを実現できます。Kinesis data stream各 PutRecords リクエストで、最大 500 レコードをサポートできます。リクエストに含まれる各レコードは 1 MB のサイズ、リクエスト全体の上限はパーティションキーを含めて最大 5 MB。以下で説明する単一の PutRecord オペレーションと同様に、PutRecords はシーケンス番号とパーティションキーを使用します。ただし、PutRecordSequenceNumberForOrdering パラメータは、PutRecords の呼び出しには含まれません。PutRecords オペレーションでは、リクエストの自然な順序ですべてのレコードを処理するよう試みます。

各データレコードには一意のシーケンス番号があります。シーケンス番号は、client.putRecords を呼び出してストリームにデータレコードを追加した後に、Kinesis Data Streams によって割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。PutRecords リクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。

注記

シーケンス番号は、同じストリーム内の一連のデータのインデックスとして使用することはできません。一連のデータを論理的に区別するには、パーティションキーを使用するか、データセットごとに個別のストリームを作成します。

PutRecords リクエストには、異なるパーティションキーのレコードを含めることができます。リクエストのスコープはストリームです。各リクエストには、リクエストの制限まで、パーティションキーとレコードのあらゆる組み合わせを含めることができます。複数の異なるパーティションキーを使用して、複数の異なるシャードを含むストリームに対して実行されたリクエストは、少数のパーティションキーを使用して少数のシャードに対して実行されたリクエストよりも一般的に高速です。レイテンシーを低減し、スループットを最大化するには、パーティションキーの数をシャードの数よりも大きくする必要があります。

PutRecords の例

次のコードでは、シーケンシャルなパーティションキーを持つ 100 件のデータレコードを作成し、DataStream という名前のストリームに格納しています。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

レスポンスには、レスポンスの PutRecords の配列が含まれます。Records レスポンス配列の各レコードは、リクエスト配列内のレコードと自然な順序 (リクエストやレスポンスの上から下へ) で直接相互に関連付けられます。レスポンスの Records 配列には、常にリクエスト配列と同じ数のレコードが含まれます。

使用時のエラーの処理PutRecords

デフォルトでは、リクエスト内の個々のレコードでエラーが発生しても、PutRecords リクエスト内のそれ以降のレコードの処理は停止されません。つまり、レスポンスの Records 配列には、正常に処理されたレコードと、正常に処理されなかったレコードの両方が含まれていることを意味します。正常に処理されなかったレコードを検出し、それ以降の呼び出しに含める必要があります。

正常に処理されたレコードには SequenceNumber 値と ShardID 値が、正常に処理されなかったレコードには ErrorCode 値と ErrorMessage 値が含まれます。パラメータはエラーのタイプを反映しており、次のいずれかの値になります。ErrorCodeProvisionedThroughputExceededException または InternalFailureErrorMessage は、ProvisionedThroughputExceededException 例外に関するより詳細な情報として、スロットリングされたレコードのアカウント ID、ストリーム名、シャード ID などを提供します。次の例では、PutRecords リクエストに 3 つのレコードがあります。2 番目のレコードは失敗し、レスポンスに反映されます。

例 PutRecords リクエストの構文

{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }

例 PutRecords レスポンスの構文

{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

正常に処理されなかったレコードは、以降の PutRecords リクエストに含めることができます。最初に、putRecordsResultFailedRecordCount パラメータを調べて、リクエスト内にエラーとなったレコードがあるかどうかを確認します。このようなレコードがある場合は、putRecordsEntryErrorCode 以外である各 null を、以降のリクエストに追加してください。このタイプのハンドラーの例については、次のコードを参照してください。

例 PutRecords エラーハンドラー

PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

を使用した単一のレコードの追加PutRecord

PutRecord の各呼び出しは、1 つのレコードに対して動作します。アプリケーションで常にリクエストごとに 1 つのレコードを送信する必要がある場合や、PutRecords を使用できないその他の理由がある場合を除いて、「を使用した複数のレコードの追加PutRecords」で説明している PutRecords オペレーションを使用します。

各データレコードには一意のシーケンス番号があります。シーケンス番号は、client.putRecord を呼び出してストリームにデータレコードが追加された後に、Kinesis Data Streams によって割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。PutRecord リクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。

入力が立て続けに行われた場合、返されるシーケンス番号は大きくなるとは限りません。入力オペレーションが基本的に Kinesis Data Streams に対して同時に実行されるためです。同じパーティションキーに対して厳密にシーケンス番号が大きくなるようにするには、PutRecord の例のサンプルコードに示しているように、SequenceNumberForOrdering パラメータを使用します。

SequenceNumberForOrdering を使用するかどうかにかかわらず、Kinesis Data Streams が GetRecords の呼び出しを通じて受け取るレコードは厳密にシーケンス番号順になります。

注記

シーケンス番号は、同じストリーム内の一連のデータのインデックスとして使用することはできません。一連のデータを論理的に区別するには、パーティションキーを使用するか、データセットごとに個別のストリームを作成します。

パーティションキーはストリーム内のデータをグループ化するために使用されます。データレコードはそのパーティションキーに基づいてストリーム内でシャードに割り当てられます。具体的には、Kinesis Data Streams ではパーティションキー (および関連するデータ) を特定のシャードにマッピングするハッシュ関数への入力として、パーティションキーを使用します。

このハッシュメカニズムの結果として、パーティションキーが同じすべてのデータレコードは、ストリーム内で同じシャードにマッピングされます。ただし、パーティションキーの数がシャードの数を超えている場合、一部のシャードにパーティションキーが異なるレコードが格納されることがあります。設計の観点から、すべてのシャードが適切に使用されるようにするには、シャードの数 (setShardCountCreateStreamRequest メソッドで指定) を一意のパーティションキーの数よりも大幅に少なくする必要があります。また、1 つのパーティションキーへのデータの流量をシャードの容量より大幅に小さくする必要があります。

PutRecord の例

以下のコードでは、2 つのパーティションキーに配分される 10 件のデータレコードを作成し、myStreamName という名前のストリームに格納しています。

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

上記のコード例では、setSequenceNumberForOrdering を使用して、各パーティションキー内で順番が厳密に増えるようにしています。このパラメータを効果的に使用するには、現在のレコード (レコード SequenceNumberForOrderingn) の を前述のレコード (レコード n-1) のシーケンス番号に設定します。ストリームに追加されたレコードのシーケンス番号を取得するには、getSequenceNumber の結果に対して putRecord を呼び出します。

パラメータは、同じパーティションキーのシーケンス番号が厳密に増えるようにします。SequenceNumberForOrderingSequenceNumberForOrdering では、複数のパーティションキーにわたるレコードの順序付けは行いません。

AWS Glue スキーマレジストリを使用したデータの操作

Kinesis データストリームを AWS Glue スキーマレジストリと統合できます。AWS Glue スキーマレジストリを使用すると、生成されたデータが登録されたスキーマによって継続的に検証されるようにしながら、スキーマを一元的に検出、制御、および進化させることができます。スキーマは、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、消費、またはストレージのバージョン管理された仕様です。AWS Glue スキーマレジストリを使用すると、ストリーミングアプリケーション内でのエンドツーエンドのデータ品質とデータガバナンスを向上させることができます。詳細については、「AWS Glue スキーマレジストリ」を参照してください。この統合を設定する方法の 1 つは、AWS Java SDK で利用可能な PutRecords および PutRecord Kinesis Data Streams APIs を使用することです。

および PutRecords Kinesis Data Streams PutRecord を使用してスキーマレジストリと Kinesis Data Streams の統合を設定する方法の詳細については、「APIsユースケースAPIs」の「Kinesis Data Streams を使用したデータの操作」セクションを参照してください。Amazon Kinesis データストリームと AWS Glue スキーマレジストリの統合