Amazon Kinesis Data Streams
開発者ガイド

Kinesis Data Streams API を使用して拡張ファンアウトでコンシューマーを開発する

拡張ファンアウトは、Amazon Kinesis Data Streams の機能です。この機能を使用すると、コンシューマーは、シャードあたり 1 秒間に最大 2 MiB のデータの専用スループットで、データストリームからレコードを受け取ることができます。拡張ファンアウトを使用するコンシューマーは、ストリームからデータを受け取っている他のコンシューマーと競合する必要はありません。詳細については、「拡張ファンアウトでコンシューマーを使用する 」を参照してください。

拡張ファンアウトを Kinesis Data Streams で使用するコンシューマを構築するには、API オペレーションを使用します。

Kinesis Data Streams API を使用して拡張ファンアウトでコンシューマーを登録するには

  1. 拡張ファンアウトを使用するコンシューマーとして RegisterStreamConsumer 呼び出してアプリケーションを登録します。Kinesis Data Streams は、コンシューマーの Amazon リソースネーム (ARN) を生成し、それをレスポンスで返します。

  2. 特定のシャードに対するリスニングを開始するには、SubscribeToShard を呼び出してコンシューマー ARN を渡します。これにより、Kinesis Data Streams は、そのシャードのレコードをユーザーにプッシュします。レコードは、HTTP/2 接続経由で SubscribeToShardEvent 型のイベントの形式でプッシュされます。接続は最大 5 分間開いたままです。SubscribeToShard への呼び出しによって返される future が正常または例外的に完了した後も、引き続きシャードからレコードを受け取る場合は、SubscribeToShard を再度呼び出します。

  3. 拡張ファンアウトを使用しているコンシューマーの登録を解除するには、DeregisterStreamConsumer を呼び出します。

次のコードは、シャードへのコンシューマーのサブスクライブ、サブスクリプションの定期更新、イベントの処理を行う方法の例です。

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }