Desenvolver consumidores de saída avançada com a API do Kinesis Data Streams - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolver consumidores de saída avançada com a API do Kinesis Data Streams

Fan-out aprimoradoÉ um recurso do Amazon Kinesis Data Streams que permite que os consumidores recebam registros de um streaming de dados com taxa de transferência dedicada de até 2 MB de dados por segundo por estilhaço. Um consumidor que usa distribuição avançada não precisa lidar com outros consumidores que estejam recebendo dados do streaming. Para obter mais informações, consulte Desenvolver consumidores personalizados com taxa de transferência dedicada (distribuição avançada).

Você pode usar operações de API para criar um consumidor que use a distribuição avançada no Kinesis Data Streams.

Para registrar um consumidor com a distribuição avançada usando a API do Kinesis Data Streams
  1. ChameRegisterStreamConsumerPara registrar o aplicativo como um consumidor que use a distribuição avançada. O Kinesis Data Streams gera um Amazon Resource Name (ARN — Nome de recurso da Amazon) para o consumidor e o retorna na resposta.

  2. Para começar a ouvir um determinado estilhaço, passe o ARN do consumidor em uma chamada paraSubscribeToShard. Em seguida, o Kinesis Data Streams começa a enviar os registros do estilhaço para você, na forma de eventos do tipoSubscribeToShardEventatravés de uma conexão HTTP/2. A conexão permanece aberta por até 5 minutos. Chame SubscribeToShard novamente caso você queira continuar a receber registros do estilhaço após future, que é retornado pela chamada para SubscribeToShard, for concluído normal ou excepcionalmente.

    nota

    SubscribeToShardA API também retorna a lista dos fragmentos filhos do fragmento atual quando o final do fragmento atual é atingido.

  3. Para cancelar o registro de um consumidor que esteja usando a distribuição avançada, chame DeregisterStreamConsumer.

O de código a seguir é um exemplo de como você pode inscrever o consumidor em um estilhaço, renovar a assinatura periodicamente e manipular os eventos.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

Seevent.ContinuationSequenceNumberretornanull, indica que ocorreu uma divisão ou fusão de fragmentos que envolveu esse fragmento. Este fragmento está agora em umCLOSEDstate, e você leu todos os registros de dados disponíveis deste fragmento. Nesse cenário, por exemplo acima, você pode usarevent.childShardsPara saber mais sobre os novos estilhaços do estilhaço que está sendo processado que foram criados pela divisão ou fusão. Para obter mais informações, consulteChildShard.