Mengembangkan Konsumen Fan-Out yang Ditingkatkan dengan API Kinesis Data Streams - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Mengembangkan Konsumen Fan-Out yang Ditingkatkan dengan API Kinesis Data Streams

Ditingkatkan fan-outadalah fitur Amazon Kinesis Data Streams yang memungkinkan konsumen menerima rekaman dari aliran data dengan throughput khusus hingga 2 MB data per detik per pecahan. Konsumen yang menggunakan fan-out yang disempurnakan tidak harus bersaing dengan konsumen lain yang menerima data dari aliran. Untuk informasi selengkapnya, lihat Mengembangkan Konsumen Khusus dengan Throughput Khusus (Peningkatan Fan-Out).

Anda dapat menggunakan operasi API untuk membangun konsumen yang menggunakan fan-out yang disempurnakan di Kinesis Data Streams.

Untuk mendaftarkan konsumen dengan fan-out yang disempurnakan menggunakan Kinesis Data Streams API
  1. PANGLANRegisterStreamConsumeruntuk mendaftarkan aplikasi Anda sebagai konsumen yang menggunakan fan-out yang disempurnakan. Kinesis Data Streams menghasilkan Amazon Resource Name (ARN) untuk konsumen dan mengembalikannya dalam respons.

  2. Untuk mulai mendengarkan pecahan tertentu, lulus ARN konsumen dalam panggilan keSubscribeToShard. Kinesis Data Streams kemudian mulai mendorong catatan dari pecahan itu kepada Anda, dalam bentuk peristiwa tipeSubscribeToShardEventmelalui koneksi HTTP/2. Sambungan tetap terbuka hingga 5 menit. PANGLANSubscribeToShardlagi jika Anda ingin terus menerima catatan dari beling setelahfutureyang dikembalikan oleh panggilan keSubscribeToShardmelengkapi secara normal atau sangat.

    catatan

    SubscribeToShardAPI juga mengembalikan daftar pecahan anak dari pecahan saat ini ketika akhir pecahan saat ini tercapai.

  3. Untuk membatalkan pendaftaran konsumen yang menggunakan fan-out ditingkatkan, panggilanDeregisterStreamConsumer.

Kode berikut adalah contoh bagaimana Anda dapat berlangganan konsumen Anda ke pecahan, memperbarui langganan secara berkala, dan menangani acara.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

Jikaevent.ContinuationSequenceNumberpulangnull, ini menunjukkan bahwa pecahan pecahan atau penggabungan telah terjadi yang melibatkan pecahan ini. beling ini sekarang dalamCLOSEDnegara, dan Anda telah membaca semua catatan data yang tersedia dari beling ini. Dalam skenario ini, per contoh di atas, Anda dapat menggunakanevent.childShardsuntuk belajar tentang pecahan anak baru dari beling yang sedang diproses yang diciptakan oleh split atau merge. Untuk informasi selengkapnya, lihatChildShard.