Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Implementa il consumatore
L'applicazione consumer in questo tutorial elabora continuamente le operazioni azionarie nel flusso di dati. Quindi, genera i titoli più acquistati e venduti ogni minuto. L'applicazione si basa sulla Kinesis Client Library (KCL), che svolge gran parte del lavoro pesante comune alle app consumer. Per ulteriori informazioni, consulta Usa la libreria Kinesis Client.
Consulta il codice sorgente e rivedi le informazioni riportate di seguito.
- StockTradesProcessor classe
-
La classe principale del consumatore, fornita per te, che svolge le seguenti attività:
-
Legge l'applicazione, il flusso di dati e i nomi delle regioni, passati come argomenti.
-
Crea un'
KinesisAsyncClient
istanza con il nome della regione. -
Crea un'istanza
StockTradeRecordProcessorFactory
che serve istanze diShardRecordProcessor
, implementate da un'istanzaStockTradeRecordProcessor
. -
Crea un'
ConfigsBuilder
istanza con l'StockTradeRecordProcessorFactory
istanzaKinesisAsyncClient
StreamName
ApplicationName
,, e. Questo è utile per creare tutte le configurazioni con valori predefiniti. -
Crea uno KCL scheduler (in precedenza, nelle KCL versioni 1.x era noto come KCL worker) con l'
ConfigsBuilder
istanza. -
Lo scheduler crea un nuovo thread per ciascun shard (assegnato a questa istanza consumer), che in un ciclo continuo legge i record dai flussi di dati. Quindi invoca l'istanza
StockTradeRecordProcessor
per elaborare ogni batch di record ricevuto.
-
- StockTradeRecordProcessor classe
-
Implementazione dell'istanza
StockTradeRecordProcessor
, che a sua volta implementa cinque metodi richiesti:initialize
,processRecords
,leaseLost
,shardEnded
eshutdownRequested
.I
shutdownRequested
metodiinitialize
and vengono utilizzati per consentire KCL all'elaboratore di registrazione di sapere rispettivamente quando dovrebbe essere pronto per iniziare a ricevere i record e quando dovrebbe aspettarsi di smettere di riceverli, in modo da poter eseguire qualsiasi attività di configurazione e terminazione specifiche dell'applicazione.leaseLost
eshardEnded
vengono utilizzati per implementare qualsiasi logica relativa alle azioni da intraprendere in caso di perdita di un leasing o di un'elaborazione giunta al termine di uno shard. In questo esempio, registriamo semplicemente i messaggi che indicano questi eventi.Ti forniamo il codice per questi metodi. L'elaborazione principale si verifica nel metodo
processRecords
, che a sua volta utilizzaprocessRecord
per ogni record. Quest'ultimo metodo viene fornito come codice di base per lo più vuoto da implementare nella fase successiva, dove è spiegato in modo dettagliato.Da segnalare è anche l'implementazione dei metodi di supporto per
processRecord
, ovveroreportStats
eresetStats
, che sono vuoti nel codice sorgente originale.Il metodo
processRecords
viene implementato per te ed esegue questa procedura:-
Per ogni record passato, chiama
processRecord
su di esso. -
Se è trascorso almeno 1 minuto dall'ultimo report, chiama
reportStats()
, che consente di stampare le statistiche più recenti, seguito daresetStats()
, che cancella le statistiche in modo che l'intervallo successivo includa solo i nuovi record. -
Imposta l'orario della creazione di report successiva.
-
Se è trascorso almeno 1 minuto dall'ultimo checkpoint, chiama
checkpoint()
. -
Imposta l'orario della creazione di checkpoint successiva.
Questo metodo utilizza intervalli di 60 secondi per la frequenza di creazione di report e checkpoint. Per ulteriori informazioni sul checkpointing, consulta Utilizzo della Kinesis Client Library.
-
- StockStats classe
-
Questa classe fornisce la conservazione dei dati e il monitoraggio delle statistiche per i titoli più comuni nel tempo. Questo codice viene fornito per te e include i seguenti metodi:
-
addStockTrade(StockTrade)
: inserisce un datoStockTrade
nelle statistiche in esecuzione. -
toString()
: restituisce le statistiche in una stringa formattata.
Questa classe tiene traccia delle azioni più popolari tenendo un conteggio progressivo del numero totale di scambi per ogni azione e del conteggio massimo. Aggiorna questi conteggi ogni volta che si verifica uno scambio.
-
Aggiungi codice ai metodi della classe StockTradeRecordProcessor
, come mostrato nella procedura seguente.
Per implementare il consumer
-
Implementare il metodo
processRecord
creando un'istanza di un oggettoStockTrade
delle dimensioni corrette e aggiungendo a essa i dati del record, registrando un avviso se si verifica un problema.byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
-
Implementa un
reportStats
metodo. Modifica il formato di output in base alle tue preferenze.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
Implementare il metodo
resetStats
, che crea una nuova istanzastockStats
.stockStats = new StockStats();
-
Implementa i seguenti metodi richiesti dall'
ShardRecordProcessor
interfaccia:@Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
Per eseguire il consumer
-
Eseguire il producer scritto in Implementa il produttore per inserire record di scambi simulati nel flusso.
-
Verificare che la chiave di accesso e la coppia di chiavi segrete recuperate in precedenza (durante la creazione IAM dell'utente) siano salvate nel file
~/.aws/credentials
. -
Eseguire la classe
StockTradesProcessor
con i seguenti argomenti:StockTradesProcessor StockTradeStream us-west-2
Nota: se è stato creato un flusso in una regione diversa da
us-west-2
, è necessario specificare quella regione qui.
Dopo un minuto, si dovrebbe visualizzare un output come il seguente, aggiornato ogni minuto:
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
Passaggi successivi
(Facoltativo) Estendi il consumatore