Kinesis Data Streams を使用して拡張ファンアウトコンシューマーを開発する API - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis Data Streams を使用して拡張ファンアウトコンシューマーを開発する API

拡張ファンアウトは Amazon Kinesis Data Streams の機能です。この機能を使用すると、コンシューマーは、シャードあたり 1 秒間に最大 2 MB のデータの専用スループットで、データストリームからレコードを受け取ることができます。拡張ファンアウトを使用するコンシューマーは、ストリームからデータを受け取っている他のコンシューマーと競合する必要はありません。詳細については、「専用スループットでカスタムコンシューマーを開発する (拡張ファンアウト)」を参照してください。

API オペレーションを使用して、Kinesis Data Streams で拡張ファンアウトを使用するコンシューマーを構築できます。

Kinesis Data Streams を使用して拡張ファンアウトでコンシューマーを登録するには API
  1. RegisterStreamConsumer を呼び出して、拡張ファンアウトを使用するコンシューマーとしてアプリケーションを登録します。Kinesis Data Streams は、コンシューマーの Amazon リソースネーム (ARN) を生成し、レスポンスで返します。

  2. 特定のシャードのリッスンを開始するには、コンシューマーを への呼び出しARNに渡しますSubscribeToShard。その後、Kinesis Data Streams は、HTTP/2 接続SubscribeToShardEventを介して タイプのイベント形式で、そのシャードからのレコードのプッシュを開始します。接続は最大 5 分間開いたままです。の呼び出しによって返された が正常に完了するか例外的にSubscribeToShard完了した後future、シャードからレコードを受信し続ける場合は、 SubscribeToShard を再度呼び出します。

    注記

    SubscribeToShard API は、現在のシャードの末尾に達すると、現在のシャードの子シャードのリストも返します。

  3. 拡張ファンアウトを使用しているコンシューマーの登録を解除するには、 を呼び出しますDeregisterStreamConsumer

次のコードは、シャードへのコンシューマーのサブスクライブ、サブスクリプションの定期更新、イベントの処理を行う方法の例です。

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

event.ContinuationSequenceNumbernull を返す場合、このシャードに関係するシャード分割またはマージが発生したことを示します。このシャードは現在 CLOSED 状態であり、このシャードから使用可能なすべてのデータレコードを読み込んでいます。このシナリオでは、上記の例のように、event.childShards を使用して、分割またはマージによって作成された、処理中のシャードの新しい子シャードについて学習することができます。詳細については、「」を参照してくださいChildShard