Etapa 5: Implementar o consumidor - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Etapa 5: Implementar o consumidor

O aplicativo consumidor neste tutorial processa continuamente as transações de ações em seu fluxo de dados. Em seguida, ele produz as ações mais populares compradas e vendidas a cada minuto. O aplicativo é compilado sobre a Kinesis Client Library (KCL), que faz grande parte do trabalho pesado comum a aplicativos consumidores. Para obter mais informações, consulte Usar a biblioteca de cliente Kinesis.

Consulte o código-fonte e analise as informações a seguir.

StockTradesClasse de processador

Principal classe do consumidor fornecida e que executa as seguintes tarefas:

  • Lê o aplicativo, o fluxo de dados e os nomes de região passados como argumentos.

  • Cria uma instância de KinesisAsyncClient com o nome da região.

  • Cria uma instância de StockTradeRecordProcessorFactory que veicula instâncias de ShardRecordProcessor, implementadas por uma instância de StockTradeRecordProcessor.

  • Cria uma instância de ConfigsBuilder com a instância de KinesisAsyncClient, StreamName, ApplicationName e StockTradeRecordProcessorFactory. Isso é útil para criar todas as configurações com valores padrão.

  • Cria um programador da KCL (anteriormente, nas versões 1.x da KCL, era conhecido como o operador da KCL) com a instância de ConfigsBuilder.

  • O programador cria um novo thread para cada estilhaço (atribuído a essa instância de consumidor), que faz loop continuamente para ler registros do fluxo de dados. Em seguida, ele invoca a instância de StockTradeRecordProcessor para processar cada lote de registros recebidos.

StockTradeRecordProcessorclasse

Implementação da instância de StockTradeRecordProcessor, que, por sua vez, implementa cinco métodos necessários: initialize, processRecords, leaseLost, shardEnded e shutdownRequested.

Os métodos initialize e shutdownRequested são usados pela KCL para permitir que o processador de registros saiba quando ele deve estar pronto para começar a receber registros e quando ele deve esperar parar de receber registros, respectivamente, para que ele possa executar qualquer configuração específica do aplicativo e tarefas de encerramento. leaseLost e shardEnded são usados para implementar qualquer lógica para o que fazer quando um contrato de aluguel é perdido ou um processamento chegou ao fim de um estilhaço. Neste exemplo, simplesmente registramos mensagens indicando esses eventos.

O código para esses métodos é fornecido para você. O processamento principal ocorre no método processRecords, que, por sua vez, usa processRecord para cada registro. Esse último método é fornecido como o código esqueleto quase todo vazio, para você implementar na próxima etapa, onde é explicado em mais detalhes.

Observe também a implementação dos métodos de suporte de processRecord: reportStats e resetStats, que estão vazios no código-fonte original.

O método processRecords, implementado para você, executa as seguintes etapas:

  • Para cada registro passado, ele chama processRecord.

  • Se tiver decorrido pelo menos 1 minuto após o último relatório, chamará reportStats(), que imprime as estatísticas mais recentes e, em seguida, resetStats(), que limpa as estatísticas para que o próximo intervalo inclua apenas registros novos.

  • Define o próximo horário para geração de relatórios.

  • Se tiver decorrido pelo menos 1 minuto após o último ponto de verificação, chamará checkpoint().

  • Define o próximo horário do ponto de verificação.

Este método usa intervalos de 60 segundos como taxa de geração de relatórios e definição de pontos de verificação. Para obter mais informações sobre definição de pontos de verificação, consulteUsando a biblioteca de cliente do Kinesis.

StockStatsclasse

Essa classe fornece retenção de dados e rastreamento de estatísticas em relação às ações mais populares ao longo do tempo. Esse código, fornecido para você, contém os seguintes métodos:

  • addStockTrade(StockTrade): injeta o StockTrade conhecido nas estatísticas correntes.

  • toString(): retorna as estatísticas em uma string formatada.

Essa classe rastreia as ações mais populares mantendo uma contagem corrente do número total de negociações de cada ação e a contagem máxima. Ela atualiza essas contagens sempre que chega uma negociação de ação.

Adicione código aos métodos da classe StockTradeRecordProcessor, como mostrado nas etapas a seguir.

Para implementar o consumidor
  1. Implemente o método processRecord instanciando um objeto StockTrade de tamanho correto e adicionando a ele os dados do registro, registrando um aviso caso ocorra problema.

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. Implemente um método reportStats simples. Sinta-se à vontade para modificar o formato de saída mais adequado às suas preferências.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. Implemente o método resetStats, que cria uma nova instância de stockStats.

    stockStats = new StockStats();
  4. Implemente os seguintes métodos exigidos pela interface ShardRecordProcessor

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
Para executar o consumidor
  1. Execute o produtor escrito em Etapa 4: Implementar o produtor para injetar registros de negociações de ações no streaming.

  2. Verifique se o par chave de acesso/chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo~/.aws/credentials.

  3. Execute a classe StockTradesProcessor com os seguintes argumentos:

    StockTradesProcessor StockTradeStream us-west-2

    Observe que, se você criou o fluxo em uma região diferente de us-west-2, será necessário especificar essa região aqui.

Depois de um minuto, deverá aparecer uma saída como a seguir, atualizada a cada minuto a partir de então:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

Próximas etapas

Etapa 6: (Opcional) Estender o consumidor