Implemente o produtor - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Implemente o produtor

Este tutorial usa o cenário do mundo real de monitoramento de transações da bolsa de valores. Os princípios a seguir explicam brevemente como este cenário é mapeado para o produtor e a estrutura de código de suporte.

Consulte o código-fonte e analise as informações a seguir.

StockTrade classe

Uma negociação de ações individual é representada por uma instância da StockTrade classe. Essa instância contém atributos como o símbolo ticker, o preço, o número de ações, o tipo da negociação (compra ou venda) e um ID que identifica a negociação com exclusividade. Essa classe é implementada para você.

Registro de stream

Um stream é uma sequência de registros. Um registro é uma serialização de uma StockTrade instância em JSON formato. Por exemplo:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator classe

StockTradeGenerator tem um método chamado getRandomTrade() que retorna uma nova negociação de ações gerada aleatoriamente toda vez que é invocada. Essa classe é implementada para você.

StockTradesWriter classe

O main método do produtor recupera StockTradesWriter continuamente uma negociação aleatória e a envia para o Kinesis Data Streams executando as seguintes tarefas:

  1. Lê o nome do fluxo de dados e o nome da região como entrada.

  2. Usa o KinesisAsyncClientBuilder para definir a região, as credenciais e a configuração do cliente.

  3. Verifica se o stream existe e está ativo (se não, ele será encerrado com um erro).

  4. Em um loop contínuo, chama o método StockTradeGenerator.getRandomTrade() e o método sendStockTrade para enviar a negociação ao stream a cada 100 milissegundos.

O método sendStockTrade da classe StockTradesWriter tem o seguinte código:

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

Consulte o desmembramento do código a seguir:

  • O PutRecord API espera uma matriz de bytes e você deve converter a negociação em JSON formato. Essa única linha de código executa a seguinte operação:

    byte[] bytes = trade.toJsonAsBytes();
  • Antes de enviar a transação, crie uma nova instância de PutRecordRequest (chamada solicitação neste caso). Cada request exige o nome do fluxo, uma chave de partição e um blob de dados.

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    O exemplo usa um ticker de ações como chave de partição, que mapeia o registro para um fragmento específico. Na prática, você deve ter centenas ou milhares de chaves de partição por estilhaço, de forma que os registros sejam uniformemente disseminados no seu stream. Para obter mais informações sobre como adicionar dados a um stream, consulte Grave dados no Amazon Kinesis Data Streams.

    Agora, request está pronto para enviar para o cliente (operação put):

    kinesisClient.putRecord(request).get();
  • A verificação e o registro de erros são sempre inclusões úteis. Este código registra condições de erro:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Adicione o bloco try/catch ao redor da operação put:

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    Isso ocorre porque uma operação put do Kinesis Data Streams pode falhar devido a erro de rede ou porque o fluxo de dados pode atingir o limite de throughput e ficar limitado. É recomendável que você considere cuidadosamente sua política de repetição de put operações para evitar perda de dados, como usar uma nova tentativa.

  • O registro de status é útil mas opcional:

    LOG.info("Putting trade: " + trade.toString());

O produtor mostrado aqui usa a funcionalidade de registro único do Kinesis API Data Streams,. PutRecord Na prática, se um produtor individual gerar muitos registros, costuma ser mais eficiente usar a funcionalidade de vários registros de PutRecords e enviar lotes de registros por vez. Para obter mais informações, consulte Grave dados no Amazon Kinesis Data Streams.

Para executar o produtor
  1. Verifique se a chave de acesso e o par de chaves secretas recuperados em Crie uma IAM política e um usuário estão salvos no arquivo ~/.aws/credentials.

  2. Execute a classe StockTradeWriter com os seguintes argumentos:

    StockTradeStream us-west-2

    Se você criou o fluxo em uma região diferente de us-west-2, será necessário especificar essa região aqui.

Você deve ver saída semelhante a:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Suas negociações de ações agora estão sendo ingeridas pelo Kinesis Data Streams.

Próximas etapas

Implemente o consumidor