Sviluppo di applicazioni consumer con fan-out avanzato con l'API di Flusso dati Kinesis - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Sviluppo di applicazioni consumer con fan-out avanzato con l'API di Flusso dati Kinesis

Il fan-out avanzato è una funzionalità di Flusso di dati Amazon Kinesis che consente ai consumer di ricevere dati da un flusso di dati con velocità di trasmissione effettiva dedicata fino a 2 MiB di dati al secondo per partizione. Un'applicazione consumer che utilizza il fan-out avanzato non è in competizione con altre applicazioni che ricevono dati dal flusso. Per ulteriori informazioni, consulta Sviluppo di consumatori personalizzati con throughput dedicato (fan-out avanzato).

È possibile utilizzare le operazioni API per creare un'applicazione consumer che utilizza il fan-out avanzato in Flusso di dati Kinesis.

Registrazione di un'applicazione consumer con il fan-out avanzato mediante l'API di Flusso di dati Kinesis
  1. Chiama RegisterStreamConsumer per registrare l'applicazione come consumer che utilizza il fan-out avanzato. Flusso di dati Kinesis genera un nome della risorsa Amazon (ARN) per il consumer e lo restituisce nella risposta.

  2. Per iniziare l'ascolto di una determinata partizione, inoltrare l'ARN dell'applicazione consumer in una chiamata a SubscribeToShard. Flusso di dati Kinesis inizia a inviare i record dalla partizione all'utente, sotto forma di eventi di tipo SubscribeToShardEvent su una connessione HTTP/2. La connessione rimane aperta per un massimo di 5 minuti. Chiama SubscribeToShard nuovamente se desideri continuare a ricevere record dallo shard dopo che il future restituito dalla chiamata a SubscribeToShard viene completato in maniera normale o eccezionale.

    Nota

    L'API SubscribeToShard restituisce anche l'elenco delle partizioni secondarie della partizione corrente quando viene raggiunta la fine della partizione corrente.

  3. Per annullare la registrazione di un'applicazione consumer che utilizza il fan-out avanzato, chiama DeregisterStreamConsumer.

Il seguente codice è un esempio di come è possibile sottoscrivere l'applicazione consumer a uno shard, rinnovare la sottoscrizione periodicamente e gestire gli eventi.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

Se event.ContinuationSequenceNumber restituisce null, indica che si è verificata una divisione o un'unione della partizione che ha interessato questa partizione. Questa partizione si trova ora nello stato CLOSED e hai letto tutti i record di dati disponibili da questa partizione. In questo scenario, è possibile utilizzare event.childShards per conoscere le nuove partizioni secondarie della partizione in fase di elaborazione che sono state create dalla divisione o dall'unione. Per ulteriori informazioni, consulta ChildShard.