Fase 4: implementazione del producer - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Fase 4: implementazione del producer

L'applicazione nel Tutorial: elaborazione in tempo reale dei dati relativi ai titoli azionari utilizzando KPL e KCL 1.x utilizza come scenario quello del monitoraggio del mercato azionario reale. I seguenti principi illustrano brevemente in che modo questo scenario è mappato alla struttura del codice producer e di supporto.

Consulta il codice sorgente e rivedi le informazioni riportate di seguito.

Classe StockTrade

Una singola negoziazione è rappresentata da un'istanza della classe StockTrade. Questa istanza include attributi come il simbolo dei titoli, il prezzo, il numero di azioni, il tipo di operazione (acquisto o vendita) e un ID univoco che identifica l'operazione. Questa classe è implementata per te.

Record di flusso

Un flusso è una sequenza di record. Un record è una serializzazione di un'istanza StockTrade in formato JSON. Ad esempio:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
Classe StockTradeGenerator

StockTradeGenerator include un metodo denominato getRandomTrade() che restituisce una negoziazione generata casualmente ogni volta che viene invocata. Questa classe è implementata per te.

Classe StockTradesWriter

Il metodo main del producer, StockTradesWriter recupera continuamente uno scambio casuale e lo invia a Flusso di dati Kinesis eseguendo queste operazioni:

  1. Legge il nome del flusso e il nome della regione come input.

  2. Crea un AmazonKinesisClientBuilder.

  3. Utilizza il generatore client per impostare la regione, le credenziali e la configurazione del client.

  4. Crea un client AmazonKinesis utilizzando il generatore di client.

  5. Verifica che il flusso esista e sia attivo (in caso contrario, si chiude con un errore).

  6. In un ciclo continuo, chiama il metodo StockTradeGenerator.getRandomTrade() e quindi il metodo sendStockTrade per inviare lo scambio al flusso ogni 100 millisecondi.

Il metodo sendStockTrade della classe StockTradesWriter include il codice seguente:

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

Fai riferimento alla seguente suddivisione del codice:

  • L'API PutRecord prevede una matrice di byte e devi convertire trade in formato JSON. Questa singola riga di codice esegue tale operazione:

    byte[] bytes = trade.toJsonAsBytes();
  • Prima di poter inviare lo scambio, devi creare una nuova istanza PutRecordRequest (denominata putRecord in questo caso):

    PutRecordRequest putRecord = new PutRecordRequest();

    Ogni chiamata PutRecord richiede il nome del flusso, la chiave di partizione e il blob di dati. Il codice seguente compila questi campi nell'oggetto putRecord utilizzando i relativi metodi setXxxx():

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    Questo esempio utilizza un ticket per titoli come chiave di partizione, che mappa il record a un determinato shard. In pratica, dovresti avere centinaia o migliaia di chiavi di partizione per shard, in modo che i record vengano distribuiti in modo uniforme in tutto il flusso. Per ulteriori informazioni su come aggiungere dati a un flusso, consulta Aggiunta di dati a un flusso.

    Ora putRecord è pronto per l'invio al client (operazione put):

    kinesisClient.putRecord(putRecord);
  • La verifica e la registrazione degli errori sono sempre aggiunte utili. Questo codice registra le condizioni di errore:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Aggiungi il blocco try/catch per l'operazione put:

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    Questo è perché un'operazione put di Flusso di dati Kinesis potrebbe non riuscire a causa di un errore di rete o perché il flusso di dati raggiunge il limite di velocità di trasmissione effettiva e viene sottoposto a limitazione. Ti consigliamo di valutare attentamente la policy per i nuovi tentativi per le operazioni put per evitare la perdita di dati, ad esempio utilizzando una funzione riprova semplice.

  • La registrazione dello stato è utile, ma opzionale:

    LOG.info("Putting trade: " + trade.toString());

Il producer mostrato qui utilizza la funzionalità record singolo dell'API di Flusso di dati Kinesis, PutRecord. In pratica, se un producer genera numerosi record, spesso è più efficiente utilizzare la funzionalità record multipli di PutRecords e inviare batch di record ogni volta. Per ulteriori informazioni, consulta Aggiunta di dati a un flusso.

Per eseguire il producer
  1. Verifica che la coppia chiave di accesso e chiave segreta recuperata durante la creazione dell'utente IAM sia stata salvata nel file ~/.aws/credentials.

  2. Eseguire la classe StockTradeWriter con i seguenti argomenti:

    StockTradeStream us-west-2

    Se è stato creato un flusso in una regione diversa da us-west-2, è necessario specificare quella regione qui.

Verrà visualizzato un output simile al seguente:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Il flusso di negoziazioni viene ora importato da Flusso di dati Kinesis.

Fasi successive

Fase 5: implementazione del consumer