Etapa 4: Implementar o produtor - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Etapa 4: Implementar o produtor

Este tutorial usa o cenário do mundo real de monitoramento de transações da bolsa de valores. Os princípios a seguir explicam brevemente como este cenário é mapeado para o produtor e a estrutura de código de suporte.

Consulte o código-fonte e analise as informações a seguir.

StockTradeclasse

Uma negociação de ação individual é representada por uma instância da classe StockTrade. Essa instância contém atributos como o símbolo ticker, o preço, o número de ações, o tipo da negociação (compra ou venda) e um ID que identifica a negociação com exclusividade. Essa classe é implementada para você.

Registro de stream

Um stream é uma sequência de registros. Um registro é uma serialização de uma instância StockTrade no formato JSON. Por exemplo:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeClasse geradora

StockTradeO gerador tem um método chamadogetRandomTrade()O que retorna uma nova negociação de ações gerada aleatoriamente sempre que ela é invocada. Essa classe é implementada para você.

StockTradesClasse de gravador

Omainmétodo do produtor,StockTradesO Writer recupera continuamente uma negociação aleatória e a envia ao Kinesis Data Streams executando as seguintes tarefas:

  1. Lê o nome do fluxo de dados e o nome da região como entrada.

  2. Usa o KinesisAsyncClientBuilder para definir região, credenciais e configuração do cliente.

  3. Verifica se o stream existe e está ativo (se não, ele será encerrado com um erro).

  4. Em um loop contínuo, chama o método StockTradeGenerator.getRandomTrade() e o método sendStockTrade para enviar a negociação ao stream a cada 100 milissegundos.

O método sendStockTrade da classe StockTradesWriter tem o seguinte código:

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

Consulte o desmembramento do código a seguir:

  • A API PutRecord espera uma matriz de bytes, e é necessário converter a transação para o formato JSON. Essa única linha de código executa a seguinte operação:

    byte[] bytes = trade.toJsonAsBytes();
  • Antes de enviar a transação, crie uma nova instância de PutRecordRequest (chamada solicitação neste caso). Cada request exige o nome do fluxo, uma chave de partição e um blob de dados.

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    O exemplo usa um tíquete de ações como uma chave de partição, que mapeia o registro para um determinado estilhaço. Na prática, você deve ter centenas ou milhares de chaves de partição por estilhaço, de forma que os registros sejam uniformemente disseminados no seu stream. Para obter mais informações sobre como adicionar dados a um stream, consulte Gravando dados no Amazon Kinesis Data Streams.

    Agora, request está pronto para enviar para o cliente (operação put):

    kinesisClient.putRecord(request).get();
  • A verificação e o registro de erros são sempre inclusões úteis. Este código registra condições de erro:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Adicione o bloco try/catch ao redor da operação put:

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    Isso ocorre porque uma operação put do Kinesis Data Streams pode falhar devido a erro de rede ou porque o fluxo de dados pode atingir o limite de taxa de transferência e ficar limitado. É recomendado considerar cuidadosamente sua política de tentativa para operações put a fim de evitar perda de dados, por exemplo, usando como uma simples tentativa.

  • O registro de status é útil mas opcional:

    LOG.info("Putting trade: " + trade.toString());

O produtor mostrado aqui usa a funcionalidade de registro único da API do Kinesis Data Streams,PutRecord. Na prática, se um produtor individual gerar muitos registros, costuma ser mais eficiente usar a funcionalidade de vários registros de PutRecords e enviar lotes de registros por vez. Para obter mais informações, consulte Gravando dados no Amazon Kinesis Data Streams.

Para executar o produtor
  1. Verifique se a chave de acesso e o par de chaves secretas recuperados em Etapa 2: Criar uma política e um usuário do IAM estão salvos no arquivo ~/.aws/credentials.

  2. Execute a classe StockTradeWriter com os seguintes argumentos:

    StockTradeStream us-west-2

    Se você criou o fluxo em uma região diferente de us-west-2, será necessário especificar essa região aqui.

Você deve ver saída semelhante a:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Suas transações de ações estão sendo ingeridas pelo Kinesis Data Streams.

Próximas etapas

Etapa 5: Implementar o consumidor