Implementa il consumatore - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Implementa il consumatore

L'applicazione consumer in questo tutorial elabora continuamente le operazioni azionarie nel flusso di dati. Quindi, genera i titoli più acquistati e venduti ogni minuto. L'applicazione si basa sulla Kinesis Client Library (KCL), che svolge gran parte del lavoro pesante comune alle app consumer. Per ulteriori informazioni, consulta Usa la libreria Kinesis Client.

Consulta il codice sorgente e rivedi le informazioni riportate di seguito.

StockTradesProcessor classe

La classe principale del consumatore, fornita per te, che svolge le seguenti attività:

  • Legge l'applicazione, il flusso di dati e i nomi delle regioni, passati come argomenti.

  • Crea un'KinesisAsyncClientistanza con il nome della regione.

  • Crea un'istanza StockTradeRecordProcessorFactory che serve istanze di ShardRecordProcessor, implementate da un'istanza StockTradeRecordProcessor.

  • Crea un'ConfigsBuilderistanza con l'StockTradeRecordProcessorFactoryistanza KinesisAsyncClient StreamNameApplicationName,, e. Questo è utile per creare tutte le configurazioni con valori predefiniti.

  • Crea uno KCL scheduler (in precedenza, nelle KCL versioni 1.x era noto come KCL worker) con l'ConfigsBuilderistanza.

  • Lo scheduler crea un nuovo thread per ciascun shard (assegnato a questa istanza consumer), che in un ciclo continuo legge i record dai flussi di dati. Quindi invoca l'istanza StockTradeRecordProcessor per elaborare ogni batch di record ricevuto.

StockTradeRecordProcessor classe

Implementazione dell'istanza StockTradeRecordProcessor, che a sua volta implementa cinque metodi richiesti: initialize, processRecords, leaseLost, shardEnded e shutdownRequested.

I shutdownRequested metodi initialize and vengono utilizzati per consentire KCL all'elaboratore di registrazione di sapere rispettivamente quando dovrebbe essere pronto per iniziare a ricevere i record e quando dovrebbe aspettarsi di smettere di riceverli, in modo da poter eseguire qualsiasi attività di configurazione e terminazione specifiche dell'applicazione. leaseLoste shardEnded vengono utilizzati per implementare qualsiasi logica relativa alle azioni da intraprendere in caso di perdita di un leasing o di un'elaborazione giunta al termine di uno shard. In questo esempio, registriamo semplicemente i messaggi che indicano questi eventi.

Ti forniamo il codice per questi metodi. L'elaborazione principale si verifica nel metodo processRecords, che a sua volta utilizza processRecord per ogni record. Quest'ultimo metodo viene fornito come codice di base per lo più vuoto da implementare nella fase successiva, dove è spiegato in modo dettagliato.

Da segnalare è anche l'implementazione dei metodi di supporto per processRecord, ovvero reportStats e resetStats, che sono vuoti nel codice sorgente originale.

Il metodo processRecords viene implementato per te ed esegue questa procedura:

  • Per ogni record passato, chiama processRecord su di esso.

  • Se è trascorso almeno 1 minuto dall'ultimo report, chiama reportStats(), che consente di stampare le statistiche più recenti, seguito da resetStats(), che cancella le statistiche in modo che l'intervallo successivo includa solo i nuovi record.

  • Imposta l'orario della creazione di report successiva.

  • Se è trascorso almeno 1 minuto dall'ultimo checkpoint, chiama checkpoint().

  • Imposta l'orario della creazione di checkpoint successiva.

Questo metodo utilizza intervalli di 60 secondi per la frequenza di creazione di report e checkpoint. Per ulteriori informazioni sul checkpointing, consulta Utilizzo della Kinesis Client Library.

StockStats classe

Questa classe fornisce la conservazione dei dati e il monitoraggio delle statistiche per i titoli più comuni nel tempo. Questo codice viene fornito per te e include i seguenti metodi:

  • addStockTrade(StockTrade): inserisce un dato StockTrade nelle statistiche in esecuzione.

  • toString(): restituisce le statistiche in una stringa formattata.

Questa classe tiene traccia delle azioni più popolari tenendo un conteggio progressivo del numero totale di scambi per ogni azione e del conteggio massimo. Aggiorna questi conteggi ogni volta che si verifica uno scambio.

Aggiungi codice ai metodi della classe StockTradeRecordProcessor, come mostrato nella procedura seguente.

Per implementare il consumer
  1. Implementare il metodo processRecord creando un'istanza di un oggetto StockTrade delle dimensioni corrette e aggiungendo a essa i dati del record, registrando un avviso se si verifica un problema.

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. Implementa un reportStats metodo. Modifica il formato di output in base alle tue preferenze.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. Implementare il metodo resetStats, che crea una nuova istanza stockStats.

    stockStats = new StockStats();
  4. Implementa i seguenti metodi richiesti dall'ShardRecordProcessorinterfaccia:

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
Per eseguire il consumer
  1. Eseguire il producer scritto in Implementa il produttore per inserire record di scambi simulati nel flusso.

  2. Verificare che la chiave di accesso e la coppia di chiavi segrete recuperate in precedenza (durante la creazione IAM dell'utente) siano salvate nel file~/.aws/credentials.

  3. Eseguire la classe StockTradesProcessor con i seguenti argomenti:

    StockTradesProcessor StockTradeStream us-west-2

    Nota: se è stato creato un flusso in una regione diversa da us-west-2, è necessario specificare quella regione qui.

Dopo un minuto, si dovrebbe visualizzare un output come il seguente, aggiornato ogni minuto:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

Passaggi successivi

(Facoltativo) Estendi il consumatore