Paso 4: Implementar el productor - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Paso 4: Implementar el productor

La aplicación del Tutorial: Procesamiento de operaciones bursátiles en tiempo real con KPL y KCL 1.x utiliza un escenario real de monitorización del mercado bursátil. Los siguientes principios explicar brevemente la forma en que este escenario se asocia con la estructura del productor y el código de apoyo.

Consulte el código fuente y revise la siguiente información.

Clase StockTrade

Una operación bursátil está representada por una instancia de la clase StockTrade. Esta instancia contiene atributos como el símbolo de cotización, el precio, el número de acciones, el tipo de transacción (compra o venta) y un ID que identifica de forma exclusiva la transacción. Esta clase se implementa por usted.

Registro de la secuencia

Una secuencia es una serie de registros. Un registro es la sucesión en serie de una instancia de StockTrade en formato JSON. Por ejemplo:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
Clase StockTradeGenerator

StockTradeGenerator tiene un método denominado getRandomTrade() que proporciona una nueva operación bursátil generada aleatoriamente cada vez que se invoca. Esta clase se implementa por usted.

Clase StockTradesWriter

El método main del productor, StockTradesWriter recupera continuamente una operación aleatoria y luego la envía a Kinesis Data Streams mediante la ejecución de las siguientes tareas:

  1. Lee los nombres de la secuencia y la región como entrada.

  2. Crea un AmazonKinesisClientBuilder.

  3. Utiliza el compilador de clientes para establecer la región, las credenciales y la configuración de cliente.

  4. Crea un cliente de AmazonKinesis mediante el compilador de clientes.

  5. Comprueba que la secuencia existe y está activa (si no, sale con un error).

  6. En un bucle continuo, llama al método StockTradeGenerator.getRandomTrade() y después al método sendStockTrade para enviar la transacción a la secuencia cada 100 milisegundos.

El método sendStockTrade de la clase StockTradesWriter tiene el siguiente código:

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

Consulte el siguiente desglose del código:

  • La API de PutRecord espera una matriz de bytes, y debe convertir la trade a formato JSON. Esta única línea de código realiza esa operación:

    byte[] bytes = trade.toJsonAsBytes();
  • Antes de poder enviar la transacción, debe crear una nueva instancia de PutRecordRequest (denominada putRecord en este caso):

    PutRecordRequest putRecord = new PutRecordRequest();

    Cada llamada a PutRecord requiere el nombre de la secuencia, la clave de partición y el blob de datos. El siguiente código rellenará estos campos en el objeto putRecord utilizando sus métodos setXxxx():

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    El ejemplo utiliza un ticker como clave de partición, que asigna el registro a una partición específica. En la práctica, debería tener cientos o miles de claves de partición por fragmento, de forma que los registros se dispersen de forma uniforme en toda la secuencia. Para obtener más información acerca de cómo agregar datos a una secuencia, consulte Agregar datos a una secuencia.

    Ahora putRecord estará listo para el envío al cliente (la operación put):

    kinesisClient.putRecord(putRecord);
  • Siempre es útil agregar funciones de comprobación y registro de errores. Este código registra las condiciones de error:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Agregue el bloque try/catch a la operación put:

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    Esto se debe a que una operación put de Kinesis Data Streams puede fallar debido a un error de red o debido a que el flujo alcanza sus límites de rendimiento y se ve limitado. Le recomendamos comprobar detalladamente su política de reintentos para las operaciones put de modo que evite la pérdida de datos, por ejemplo utilizando un reintento simple.

  • El registro de estado resulta útil, pero es opcional:

    LOG.info("Putting trade: " + trade.toString());

El productor que se muestra aquí utiliza la funcionalidad de registro único de la API de Kinesis Data Streams, PutRecord. En la práctica, si un solo productor genera una gran cantidad de registros, suele ser más eficaz utilizar la funcionalidad de varios registros de PutRecords y enviar lotes de registros de una vez. Para obtener más información, consulte Agregar datos a una secuencia.

Para ejecutar el productor
  1. Compruebe que la clave de acceso y el par de claves secretas recuperadas anteriormente (al crear el usuario de IAM) se guardaron en el archivo ~/.aws/credentials.

  2. Ejecute la clase StockTradeWriter con los siguientes argumentos:

    StockTradeStream us-west-2

    Si ha creado su secuencia en una región diferente a us-west-2 tendrá que especificar esa región aquí.

Debería ver una salida similar a esta:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

El flujo de operaciones bursátiles está siendo adquirido por Kinesis Data Streams.

Pasos siguientes

Paso 5: Implementar el consumidor