Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Schritt 5: Konsument implementieren
Die Verbraucheranwendung in diesem Tutorial verarbeitet die Wertpapiertransaktionen im Daten-Stream kontinuierlich. Sie gibt dann für jede Minute die beliebtesten Aktien aus, die gekauft und verkauft wurden. Die Anwendung setzt auf Kinesis Client Library (KCL) auf, die viele der mühsamen Arbeiten einer Verbraucheranwendung übernimmt. Weitere Informationen finden Sie unter Verwenden der Kinesis Client Library.
Überprüfen Sie die folgenden Informationen in Bezug auf den Quellcode.
- StockTradesProcessor-Klasse
-
Hauptklasse des Konsumenten, die für Sie bereitgestellt wird und folgende Aufgaben übernimmt:
-
Namen von Anwendung, Daten-Stream und Region lesen, die als Argumente übergeben werden.
-
KinesisAsyncClient
-Instance mit dem Regionsnamen erstellen. -
Erstellt eine
StockTradeRecordProcessorFactory
-Instance für die Instances vonShardRecordProcessor
, implementiert von einerStockTradeRecordProcessor
-Instance. -
ConfigsBuilder
-Instance mit derKinesisAsyncClient
-,StreamName
-,ApplicationName
- und derStockTradeRecordProcessorFactory
-Instance erstellen. Dies ist für das Erstellen aller Konfigurationen mit Standardwerten nützlich. -
KCL-Scheduler (zuvor in den KCL-Versionen 1.x als KCL-Worker bezeichnet) mit der
ConfigsBuilder
-Instance erstellen. -
Der Scheduler erstellt für jeden Shard (der dieser Verbraucher-Instance zugeordnet ist) einen neuen Thread, der in einer Schleife die Datensätze aus dem Daten-Stream liest. Anschließend wird die
StockTradeRecordProcessor
-Instance aufgerufen, um die empfangenen Datensatzstapel zu verarbeiten.
-
- StockTradeRecordProcessor-Klasse
-
Implementierung der
StockTradeRecordProcessor
-Instance, die wiederum fünf erforderliche Methoden implementiert:initialize
,processRecords
,leaseLost
,shardEnded
undshutdownRequested
.Die Methoden
initialize
undshutdownRequested
werden von KCL verwendet, um dem Datensatzverarbeiter mitzuteilen, wann er bereit sein muss, Datensätze zu empfangen, und wann der Empfang von Datensätzen gestoppt werden muss, damit alle anwendungsspezifischen Einrichtungs- und Beendigungsaufgaben ausgeführt werden können.leaseLost
undshardEnded
werden verwendet, um Logik zu implementieren, die beim Verlust eines Lease oder bei Erreichen des Shards-Endes im Rahmen der Shard-Verarbeitung auszuführen ist. In diesem Beispiel protokollieren wir einfach Meldungen dieser Ereignisse.Der Code für diese Methoden wird für Sie bereitgestellt. Die wesentliche Verarbeitung erfolgt mit der
processRecords
Methode, die wiederumprocessRecord
für die einzelnen Datensätze nutzt. Die letztgenannte Methode wird als nahezu leerer Skeleton-Code bereitgestellt und im nächsten Schritt (der weitere Informationen enthält) implementiert.Beachten Sie außerdem die Implementierung der Hilfsmethoden für
processRecord
:reportStats
undresetStats
, die im ursprünglichen Quellcode leer sind.Die
processRecords
-Methode wurde für Sie implementiert und führt die folgenden Schritte aus:-
Für jeden übergebenen Datensatz wird
processRecord
aufgerufen. -
Ruft
reportStats()
zum Drucken der neuesten Statistiken auf, wenn seit dem letzten Bericht mindestens 1 Minute vergangen ist, und dannresetStats()
, um die Statistiken zu löschen, damit das nächste Intervall nur neue Datensätze enthält. -
Legt den Zeitpunkt für die nächste Berichterstellung fest.
-
Ruft
checkpoint()
auf, wenn seit dem letzten Prüfpunkt mindestens 1 Minute vergangen ist. -
Legt den Zeitpunkt für das nächste Checkpointing fest.
Diese Methode verwendet für das Checkpointing und die Berichterstellung ein Intervall von 60 Sekunden. Weitere Informationen zum Checkpointing finden Sie unter Verwendung der Kinesis Client Library.
-
- StockStats-Klasse
-
Diese Klasse stellt eine Datenaufbewahrung und eine Nachverfolgung von Statistiken für die beliebtesten Aktien bereit. Dieser Code wird für Sie bereitgestellt und enthält folgende Methoden:
-
addStockTrade(StockTrade)
: fügt die angegebeneStockTrade
in die ausgeführten Statistiken ein. -
toString()
: gibt die Statistiken als formatierte Zeichenfolge zurück.
Diese Klasse verfolgt die beliebtesten Wertpapiere, indem kontinuierlich die Anzahl der Handelstransaktionen für jedes Wertpapier sowie die maximale Anzahl gezählt werden. Sie aktualisiert diese Werte, sobald eine neue Handelstransaktion empfangen wird.
-
Fügen Sie Code zu den Methoden der StockTradeRecordProcessor
-Klasse hinzu, wie in den folgenden Schritten gezeigt.
So implementieren Sie den Konsumenten
-
Implementieren Sie die
processRecord
-Methode, indem Sie ein richtig bemessenesStockTrade
-Objekt instanziieren und die Datensatzdaten zu diesem hinzufügen, sodass im Falle eines Problems eine Warnung protokolliert wird.byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
-
Implementieren Sie eine einfache
reportStats
-Methode. Sie können das Ausgabeformat an die jeweiligen Anforderungen anpassen.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
Implementieren Sie die Methode
resetStats
, die eine neuestockStats
-Instance erstellt.stockStats = new StockStats();
-
Implementieren der folgenden Methoden, die für die
ShardRecordProcessor
-Schnittstelle benötigt werden@Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
So führen Sie den Konsumenten aus
-
Führen Sie den unter Schritt 4: Produzent implementieren erstellten Produzenten aus, um simulierte Wertpapiertransaktionsdatensätze in den Stream zu schreiben.
-
Stellen Sie sicher, dass der Zugriffsschlüssel und das geheime Schlüsselpaar, die vorher (beim Erstellen des IAM-Benutzers) abgerufen wurden, in der Datei
~/.aws/credentials
gespeichert sind. -
Führen Sie die
StockTradesProcessor
-Klasse mit den folgenden Argumenten aus:StockTradesProcessor StockTradeStream us-west-2
Beachten Sie, dass Sie die Region hier angeben müssen, wenn Sie den Stream in einer anderen Region als
us-west-2
erstellt haben.
Nach einer Minute sollen Sie eine Ausgabe ähnlich der folgenden sehen, die anschließend einmal pro Minute aktualisiert wird:
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
Nächste Schritte
Schritt 6: (optional) Konsument erweitern