Fase 4: Implementare il produttore - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Fase 4: Implementare il produttore

Questo tutorial utilizza lo scenario reale del monitoraggio del commercio azionario. I seguenti principi illustrano brevemente in che modo questo scenario è mappato alla struttura del codice producer e di supporto.

Consulta il codice sorgente e rivedi le informazioni riportate di seguito.

StockTrade classe

Una singola borsa è rappresentata da un'istanza della StockTrade classe. Questa istanza include attributi come il simbolo dei titoli, il prezzo, il numero di azioni, il tipo di operazione (acquisto o vendita) e un ID univoco che identifica l'operazione. Questa classe è implementata per te.

Record di flusso

Un flusso è una sequenza di record. Un record è una serializzazione di un'StockTradeistanza in JSON formato. Per esempio:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator classe

StockTradeGenerator ha un metodo chiamato getRandomTrade() che restituisce una nuova compravendita di azioni generata casualmente ogni volta che viene richiamata. Questa classe è implementata per te.

StockTradesWriter classe

Il main metodo del produttore recupera StockTradesWriter continuamente uno scambio casuale e quindi lo invia a Kinesis Data Streams eseguendo le seguenti attività:

  1. Legge il nome del flusso di dati e il nome della regione come input.

  2. Utilizza il KinesisAsyncClientBuilder per impostare la regione, le credenziali e la configurazione del client.

  3. Verifica che il flusso esista e sia attivo (in caso contrario, si chiude con un errore).

  4. In un ciclo continuo, chiama il metodo StockTradeGenerator.getRandomTrade() e quindi il metodo sendStockTrade per inviare lo scambio al flusso ogni 100 millisecondi.

Il metodo sendStockTrade della classe StockTradesWriter include il codice seguente:

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

Fai riferimento alla seguente suddivisione del codice:

  • PutRecordAPISi aspetta un array di byte ed è necessario convertire lo scambio in formato. JSON Questa singola riga di codice esegue tale operazione:

    byte[] bytes = trade.toJsonAsBytes();
  • Prima di poter inviare lo scambio, devi creare una nuova istanza PutRecordRequest (denominata richiesta in questo caso): Ogni request richiede il nome del flusso, la chiave di partizione e un blob di dati.

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    L'esempio utilizza uno stock ticker come chiave di partizione, che mappa il record su uno shard specifico. In pratica, dovresti avere centinaia o migliaia di chiavi di partizione per shard, in modo che i record vengano distribuiti in modo uniforme in tutto il flusso. Per ulteriori informazioni su come aggiungere dati a un flusso, consulta Scrittura di dati su Amazon Kinesis Data Streams.

    Ora request è pronto per l'invio al client (operazione put):

    kinesisClient.putRecord(request).get();
  • La verifica e la registrazione degli errori sono sempre aggiunte utili. Questo codice registra le condizioni di errore:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Aggiungi il blocco try/catch per l'operazione put:

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    Questo è perché un'operazione put del flusso di dati Kinesis può non riuscire a causa di un errore di rete o perché il flusso di dati raggiunge il limite di velocità di trasmissione effettiva e viene sottoposto a limitazione. Si consiglia di valutare attentamente la politica di nuovi tentativi per le put operazioni volte a evitare la perdita di dati, come l'utilizzo di un nuovo tentativo.

  • La registrazione dello stato è utile, ma opzionale:

    LOG.info("Putting trade: " + trade.toString());

Il produttore mostrato qui utilizza la funzionalità di registrazione singola di Kinesis API Data Streams,. PutRecord In pratica, se un producer genera numerosi record, spesso è più efficiente utilizzare la funzionalità record multipli di PutRecords e inviare batch di record ogni volta. Per ulteriori informazioni, consulta Scrittura di dati su Amazon Kinesis Data Streams.

Per eseguire il producer
  1. Verificare che la chiave di accesso e la coppia di chiavi segrete recuperate in Fase 2: Creare una policy e un utente IAM siano salvate nel file ~/.aws/credentials.

  2. Eseguire la classe StockTradeWriter con i seguenti argomenti:

    StockTradeStream us-west-2

    Se è stato creato un flusso in una regione diversa da us-west-2, è necessario specificare quella regione qui.

Verrà visualizzato un output simile al seguente:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Il flusso di negoziazioni viene ora importato dal flusso di dati Kinesis.

Passaggi successivi

Fase 5: Implementazione del consumatore