Desenvolver consumidores de distribuição avançada com a API do Kinesis Data Streams - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolver consumidores de distribuição avançada com a API do Kinesis Data Streams

A distribuição avançada é um recurso do Amazon Kinesis Data Streams que permite que os consumidores recebam registros de um fluxo de dados com throughput dedicada de até 2 MB de dados por segundo por fragmento. Um consumidor que usa distribuição avançada não precisa lidar com outros consumidores que estejam recebendo dados do streaming. Para obter mais informações, consulte Desenvolver consumidores personalizados com throughput dedicada (distribuição avançada).

Você pode usar operações de API para criar um consumidor que usa a distribuição avançada no Kinesis Data Streams.

Para registrar um consumidor com distribuição avançada usando a API do Kinesis Data Streams
  1. Chame RegisterStreamConsumer para registrar a aplicação como um consumidor que usa distribuição avançada. O Kinesis Data Streams gera um nome do recurso da Amazon (ARN) para o consumidor e o retorna na resposta.

  2. Para começar a ouvir um determinado fragmento, passe o ARN do consumidor em uma chamada para SubscribeToShard. Em seguida, o Kinesis Data Streams começa a enviar os registros do fragmento para você, na forma de eventos do tipo SubscribeToShardEvent, por conexão HTTP/2. A conexão permanece aberta por até 5 minutos. Chame SubscribeToShard novamente caso você queira continuar a receber registros do estilhaço após future, que é retornado pela chamada para SubscribeToShard, for concluído normal ou excepcionalmente.

    nota

    SubscribeToShard A API também retorna a lista dos fragmentos filho do fragmento atual quando chega ao final do fragmento.

  3. Para cancelar o registro de um consumidor que esteja usando a distribuição avançada, chame DeregisterStreamConsumer.

O de código a seguir é um exemplo de como você pode inscrever o consumidor em um estilhaço, renovar a assinatura periodicamente e manipular os eventos.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

event.ContinuationSequenceNumber retorna null para indicar que o fragmento passou por uma divisão ou uma mesclagem. O fragmento agora está em estado de CLOSED, e você leu todos os registros de dados disponíveis nele. Nesse cenário, como mostrado no exemplo acima, você pode usar event.childShards para conhecer os fragmentos filho que foram criados pela divisão ou mesclagem do fragmento sendo processado. Para obter mais informações, consulte ChildsHard.