Desarrollo de consumidores de distribución ramificada mejorada con la API de Kinesis Data Streams - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Desarrollo de consumidores de distribución ramificada mejorada con la API de Kinesis Data Streams

La distribución ramificada mejorada es una característica de Amazon Kinesis Data Streams que permite a los consumidores recibir registros de un flujo de datos con un rendimiento dedicado de hasta 2 MB de datos por segundo por partición. Un consumidor que utiliza la distribución ramificada mejorada no tiene que competir con otros consumidores que reciben datos de la secuencia. Para obtener más información, consulte Desarrollo y uso de consumidores personalizados con rendimiento dedicado (Distribución ramificada mejorada).

Puede utilizar las operaciones de la API para crear un consumidor que utilice la distribución ramificada mejorada en Kinesis Data Streams.

Para registrar un consumidor con distribución ramificada mejorada mediante la API de Kinesis Data Streams
  1. Llame a RegisterStreamConsumer para registrar la aplicación como un consumidor que utiliza la distribución ramificada mejorada. Kinesis Data Streams genera un nombre de recurso de Amazon (ARN) para el consumidor y lo devuelve en la respuesta.

  2. Para iniciar la escucha de una partición específica, traslade el ARN del consumidor en una llamada a SubscribeToShard. A continuación, Kinesis Data Streams comienza a enviarle los registros de esa partición en forma de eventos de tipo SubscribeToShardEvent a través de una conexión HTTP/2. La conexión permanece abierta durante un máximo de 5 minutos. Llame de nuevo a SubscribeToShard si desea continuar recibiendo registros del fragmento después de que el future que devuelve la llamada a SubscribeToShard finalice con normalidad o con excepciones.

    nota

    La API SubscribeToShard también devuelve la lista de las particiones secundarias de la partición actual cuando se alcanza el final de la partición actual.

  3. Para anular el registro de un consumidor que utiliza la distribución ramificada mejorada, llame a DeregisterStreamConsumer.

El código siguiente es un ejemplo de cómo suscribir el consumidor a un fragmento, renovar la suscripción de forma periódica y controlar los eventos.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

Si event.ContinuationSequenceNumber devuelve null, indica que se ha producido una división o combinación de una partición que ha implicado esta partición. Esta partición se encuentra ahora en un estado CLOSED, y se han leído todos los registros de datos disponibles de esta partición. En este escenario, según el ejemplo anterior, puede utilizar event.childShards para obtener información sobre las nuevas particiones secundarias de la partición que se procesa y que se crearon mediante la división o la combinación. Para obtener más información, consulte ChildShard.