Etapa 4: Implementar o produtor - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Etapa 4: Implementar o produtor

O aplicativo no Tutorial: Processar dados de ações em tempo real usando KCL e KCL 1.x usa o cenário real de monitoramento de negociações em bolsa de valores. Os princípios a seguir explicam brevemente como este cenário é mapeado para o produtor e a estrutura de código de apoio.

Consulte o código-fonte e analise as informações a seguir.

StockTradeclasse

Uma negociação de ação individual é representada por uma instância da classe StockTrade. Essa instância contém atributos como o símbolo ticker, o preço, o número de ações, o tipo da negociação (compra ou venda) e um ID que identifica a negociação com exclusividade. Essa classe é implementada para você.

Registro de stream

Um stream é uma sequência de registros. Um registro é uma serialização de uma instância StockTrade no formato JSON. Por exemplo:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeClasse geradora

StockTradeGenerator tem um método denominado getRandomTrade(), que retorna uma nova negociação de ações gerada aleatoriamente sempre que ela é invocada. Essa classe é implementada para você.

StockTradesClasse de gravador

Omainmétodo do produtor,StockTradesWriterRecupera continuamente uma transação aleatória e a envia ao Kinesis Data Streams executando as seguintes tarefas:

  1. Lê o nome do stream e o nome da região como entrada.

  2. Cria um AmazonKinesisClientBuilder.

  3. Usa o criador do cliente para definir região, credenciais e configuração do cliente.

  4. Cria um cliente AmazonKinesis usando o criador do cliente.

  5. Verifica se o stream existe e está ativo (se não, ele será encerrado com um erro).

  6. Em um loop contínuo, chama o método StockTradeGenerator.getRandomTrade() e o método sendStockTrade para enviar a negociação ao stream a cada 100 milissegundos.

O método sendStockTrade da classe StockTradesWriter tem o seguinte código:

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

Consulte o desmembramento do código a seguir:

  • A API de PutRecord espera uma matriz de bytes, e você precisa converter trade no formato JSON. Essa única linha de código executa a seguinte operação:

    byte[] bytes = trade.toJsonAsBytes();
  • Antes de enviar a negociação, você cria uma nova instância de PutRecordRequest (denominada putRecord neste caso):

    PutRecordRequest putRecord = new PutRecordRequest();

    Cada chamada a PutRecord requer o nome do stream, uma chave de partição e um blob de dados. O código a seguir preenche esses campos no objeto putRecord usando seus métodos setXxxx():

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    O exemplo usa um tíquete de ações como uma chave de partição, que mapeia o registro para um determinado estilhaço. Na prática, você deve ter centenas ou milhares de chaves de partição por estilhaço, de forma que os registros sejam uniformemente disseminados no seu stream. Para obter mais informações sobre como adicionar dados a um stream, consulte Adicionar dados a um stream.

    Agora putRecord está pronto para enviar para o cliente (operação put):

    kinesisClient.putRecord(putRecord);
  • A verificação e o registro de erros são sempre inclusões úteis. Este código registra condições de erro:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Adicione o bloco try/catch ao redor da operação put:

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    Isso ocorre porque um Kinesis Data StreamsputA operação do pode falhar devido a erro de rede ou porque o stream pode atingir o limite de taxa de transferência e ficar limitado. Recomendamos considerar cuidadosamente sua política de retentativa para operações put a fim de evitar perda de dados, por exemplo, usando como uma simples retentativa.

  • O registro de status é útil mas opcional:

    LOG.info("Putting trade: " + trade.toString());

O produtor mostrado aqui usa a funcionalidade de registro único da API do Kinesis Data Streams,PutRecord. Na prática, se um produtor individual gerar muitos registros, costuma ser mais eficiente usar a funcionalidade de vários registros de PutRecords e enviar lotes de registros por vez. Para obter mais informações, consulte Adicionar dados a um stream.

Para executar o produtor
  1. Verifique se o par chave de acesso e chave secreta recuperado anteriormente (durante a criação do usuário do IAM) foi salvo no arquivo~/.aws/credentials.

  2. Execute a classe StockTradeWriter com os seguintes argumentos:

    StockTradeStream us-west-2

    Se você criou o stream em uma região diferente de us-west-2, precisará especificar essa região aqui.

Você deve ver saída semelhante a:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Seu stream de negociações de ações agora está sendo ingerido pelo Kinesis Data Streams.

Próximas etapas

Etapa 5: Implementar o consumidor