Paso 4: Implementar el productor - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Paso 4: Implementar el productor

Este tutorial utiliza el escenario del mundo real de la monitorización del mercado bursátil. Los siguientes principios explican brevemente cómo este escenario se asigna al productor y su estructura de código compatible.

Consulte el código fuente y revise la siguiente información.

Clase StockTrade

Un comercio de acciones individual está representado por una instancia de la clase StockTrade. Esta instancia contiene atributos como el símbolo de cotización, el precio, el número de acciones, el tipo de transacción (compra o venta) y un ID que identifica de forma exclusiva la transacción. Esta clase se implementa por usted.

Registro de la secuencia

Una secuencia es una serie de registros. Un registro es la sucesión en serie de una instancia de StockTrade en formato JSON. Por ejemplo:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
Clase StockTradeGenerator

StockTradeGenerator tiene un método denominado getRandomTrade() que devuelve una nueva operación de comercio de existencias generada aleatoriamente cada vez que se invoca. Esta clase se implementa por usted.

Clase StockTradesWriter

El método main del productor, StockTradesWriter recupera continuamente una operación aleatoria y luego la envía a Kinesis Data Streams mediante la ejecución de las siguientes tareas:

  1. Lee los nombres de la secuencia y el nombre de la región como entrada.

  2. Utiliza el KinesisAsyncClientBuilder para establecer la región, las credenciales y la configuración de cliente.

  3. Comprueba que la secuencia existe y está activa (si no, sale con un error).

  4. En un bucle continuo, llama al método StockTradeGenerator.getRandomTrade() y después al método sendStockTrade para enviar la transacción a la secuencia cada 100 milisegundos.

El método sendStockTrade de la clase StockTradesWriter tiene el siguiente código:

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

Consulte el siguiente desglose del código:

  • La API de PutRecord espera una matriz de bytes y debe convertir la transacción al formato JSON. Esta única línea de código realiza esa operación:

    byte[] bytes = trade.toJsonAsBytes();
  • Antes de poder enviar la transacción, debe crear una nueva instancia de PutRecordRequest (denominada solicitud en este caso). Cada llamada a request requiere el nombre de la secuencia, la clave de partición y el blob de datos.

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    El ejemplo utiliza un ticker como clave de partición, que asigna el registro a una partición específica. En la práctica, debería tener cientos o miles de claves de partición por fragmento, de forma que los registros se dispersen de forma uniforme en toda la secuencia. Para obtener más información acerca de cómo agregar datos a una secuencia, consulte Escritura de datos en Amazon Kinesis Data Streams.

    Ahora request estará listo para el envío al cliente (la operación PUT):

    kinesisClient.putRecord(request).get();
  • Siempre es útil agregar funciones de comprobación y registro de errores. Este código registra las condiciones de error:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Agregue el bloque try/catch a la operación put:

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    Esto se debe a que una operación put de Kinesis Data Streams puede fallar debido a un error de red o debido a que la secuencia de datos alcanza sus límites de rendimiento y se estrangula. Le recomendamos comprobar detalladamente su política de reintentos para las operaciones put de modo que evite la pérdida de datos, por ejemplo utilizando un reintento simple.

  • El registro de estado resulta útil, pero es opcional:

    LOG.info("Putting trade: " + trade.toString());

El productor que se muestra aquí utiliza la funcionalidad de registro único de la API de Kinesis Data Streams, PutRecord. En la práctica, si un solo productor genera una gran cantidad de registros, suele ser más eficaz utilizar la funcionalidad de varios registros de PutRecords y enviar lotes de registros de una vez. Para obtener más información, consulte Escritura de datos en Amazon Kinesis Data Streams.

Para ejecutar el productor
  1. Compruebe que el par de clave de acceso y el par de clave secreta recuperadas en Paso 2: Crear una política y un usuario de IAM se guardan en el archivo ~/.aws/credentials.

  2. Ejecute la clase StockTradeWriter con los siguientes argumentos:

    StockTradeStream us-west-2

    Si ha creado su secuencia en una región diferente a us-west-2 tendrá que especificar esa región aquí.

Debería ver una salida similar a esta:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Ahora, Kinesis Data Streams está ingiriendo sus operaciones bursátiles.

Pasos siguientes

Paso 5: Implementar el consumidor