Développez un plus grand nombre de fans grâce aux Kinesis Data Streams API - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Développez un plus grand nombre de fans grâce aux Kinesis Data Streams API

La diffusion améliorée est une fonction Amazon Kinesis Data Streams permettant aux applications consommateur de recevoir des enregistrements provenant d'un flux de données avec un débit dédié de jusqu'à 2 Mo de données par seconde par partition. Une application consommateur utilisant la diffusion améliorée n'a pas besoin de se heurter à d'autres applications consommateur qui reçoivent des données à partir du flux. Pour plus d’informations, consultez Développez des consommateurs personnalisés avec un débit dédié (fan-out amélioré).

Vous pouvez utiliser API les opérations pour créer un consommateur qui utilise le ventilateur amélioré dans Kinesis Data Streams.

Pour inscrire un consommateur à un système de ventilation amélioré à l'aide des Kinesis Data Streams API
  1. Appelez RegisterStreamConsumerpour enregistrer votre application en tant que client utilisant un ventilateur amélioré. Kinesis Data Streams génère un Amazon Resource Name ARN () pour le consommateur et le renvoie dans la réponse.

  2. Pour commencer à écouter un fragment spécifique, transmettez le consommateur ARN dans un appel à SubscribeToShard. Kinesis Data Streams commence alors à vous transmettre les enregistrements de cette partition, sous la forme d'événements de SubscribeToShardEventtype 2 via HTTP une connexion /2. La connexion demeure ouvert pour une durée maximum de 5 minutes. Appelez SubscribeToShardà nouveau si vous souhaitez continuer à recevoir des enregistrements de la partition une fois future que l'appel les a renvoyés SubscribeToShardnormalement ou exceptionnellement.

    Note

    SubscribeToShardAPIrenvoie également la liste des fragments enfants de la partition actuelle lorsque la fin de la partition actuelle est atteinte.

  3. Pour annuler l'enregistrement d'un consommateur qui utilise le ventilateur amélioré, appelez. DeregisterStreamConsumer

Le code suivant est un exemple de la façon dont vous pouvez abonner votre application consommateur à une partition, renouveler l'abonnement périodiquement et gérer les événements.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

Si event.ContinuationSequenceNumber renvoie null, cela indique qu'une division ou une fusion de partition a eu lieu impliquant cette partition. Cette partition est maintenant dans un état CLOSED, et vous avez lu tous les enregistrements de données disponibles à partir de cette partition. Dans ce scénario, comme indiqué ci-dessus, vous pouvez utiliser event.childShards pour en savoir plus sur les nouvelles partitions secondaires de la partition en cours de traitement qui ont été créées par la scission ou la fusion. Pour plus d'informations, consultez ChildShard.