Schritt 5: Konsument implementieren - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Schritt 5: Konsument implementieren

Die Verbraucheranwendung in diesem Tutorial verarbeitet die Wertpapiertransaktionen im Daten-Stream kontinuierlich. Sie gibt dann für jede Minute die beliebtesten Aktien aus, die gekauft und verkauft wurden. Die Anwendung setzt auf Kinesis Client Library (KCL) auf, die viele der mühsamen Arbeiten einer Verbraucheranwendung übernimmt. Weitere Informationen finden Sie unter Verwenden der Kinesis Client Library.

Überprüfen Sie die folgenden Informationen in Bezug auf den Quellcode.

StockTradesProcessor-Klasse

Hauptklasse des Konsumenten, die für Sie bereitgestellt wird und folgende Aufgaben übernimmt:

  • Namen von Anwendung, Daten-Stream und Region lesen, die als Argumente übergeben werden.

  • KinesisAsyncClient-Instance mit dem Regionsnamen erstellen.

  • Erstellt eine StockTradeRecordProcessorFactory-Instance für die Instances von ShardRecordProcessor, implementiert von einer StockTradeRecordProcessor-Instance.

  • ConfigsBuilder-Instance mit der KinesisAsyncClient-, StreamName-, ApplicationName- und der StockTradeRecordProcessorFactory-Instance erstellen. Dies ist für das Erstellen aller Konfigurationen mit Standardwerten nützlich.

  • KCL-Scheduler (zuvor in den KCL-Versionen 1.x als KCL-Worker bezeichnet) mit der ConfigsBuilder-Instance erstellen.

  • Der Scheduler erstellt für jeden Shard (der dieser Verbraucher-Instance zugeordnet ist) einen neuen Thread, der in einer Schleife die Datensätze aus dem Daten-Stream liest. Anschließend wird die StockTradeRecordProcessor-Instance aufgerufen, um die empfangenen Datensatzstapel zu verarbeiten.

StockTradeRecordProcessor-Klasse

Implementierung der StockTradeRecordProcessor-Instance, die wiederum fünf erforderliche Methoden implementiert: initialize, processRecords, leaseLost, shardEnded und shutdownRequested.

Die Methoden initialize und shutdownRequested werden von KCL verwendet, um dem Datensatzverarbeiter mitzuteilen, wann er bereit sein muss, Datensätze zu empfangen, und wann der Empfang von Datensätzen gestoppt werden muss, damit alle anwendungsspezifischen Einrichtungs- und Beendigungsaufgaben ausgeführt werden können. leaseLost und shardEnded werden verwendet, um Logik zu implementieren, die beim Verlust eines Lease oder bei Erreichen des Shards-Endes im Rahmen der Shard-Verarbeitung auszuführen ist. In diesem Beispiel protokollieren wir einfach Meldungen dieser Ereignisse.

Der Code für diese Methoden wird für Sie bereitgestellt. Die wesentliche Verarbeitung erfolgt mit der processRecords Methode, die wiederum processRecord für die einzelnen Datensätze nutzt. Die letztgenannte Methode wird als nahezu leerer Skeleton-Code bereitgestellt und im nächsten Schritt (der weitere Informationen enthält) implementiert.

Beachten Sie außerdem die Implementierung der Hilfsmethoden für processRecord: reportStats und resetStats, die im ursprünglichen Quellcode leer sind.

Die processRecords-Methode wurde für Sie implementiert und führt die folgenden Schritte aus:

  • Für jeden übergebenen Datensatz wird processRecord aufgerufen.

  • Ruft reportStats() zum Drucken der neuesten Statistiken auf, wenn seit dem letzten Bericht mindestens 1 Minute vergangen ist, und dann resetStats(), um die Statistiken zu löschen, damit das nächste Intervall nur neue Datensätze enthält.

  • Legt den Zeitpunkt für die nächste Berichterstellung fest.

  • Ruft checkpoint() auf, wenn seit dem letzten Prüfpunkt mindestens 1 Minute vergangen ist.

  • Legt den Zeitpunkt für das nächste Checkpointing fest.

Diese Methode verwendet für das Checkpointing und die Berichterstellung ein Intervall von 60 Sekunden. Weitere Informationen zum Checkpointing finden Sie unter Verwendung der Kinesis Client Library.

StockStats-Klasse

Diese Klasse stellt eine Datenaufbewahrung und eine Nachverfolgung von Statistiken für die beliebtesten Aktien bereit. Dieser Code wird für Sie bereitgestellt und enthält folgende Methoden:

  • addStockTrade(StockTrade): fügt die angegebene StockTrade in die ausgeführten Statistiken ein.

  • toString(): gibt die Statistiken als formatierte Zeichenfolge zurück.

Diese Klasse verfolgt die beliebtesten Wertpapiere, indem kontinuierlich die Anzahl der Handelstransaktionen für jedes Wertpapier sowie die maximale Anzahl gezählt werden. Sie aktualisiert diese Werte, sobald eine neue Handelstransaktion empfangen wird.

Fügen Sie Code zu den Methoden der StockTradeRecordProcessor-Klasse hinzu, wie in den folgenden Schritten gezeigt.

So implementieren Sie den Konsumenten
  1. Implementieren Sie die processRecord-Methode, indem Sie ein richtig bemessenes StockTrade-Objekt instanziieren und die Datensatzdaten zu diesem hinzufügen, sodass im Falle eines Problems eine Warnung protokolliert wird.

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. Implementieren Sie eine einfache reportStats-Methode. Sie können das Ausgabeformat an die jeweiligen Anforderungen anpassen.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. Implementieren Sie die Methode resetStats, die eine neue stockStats-Instance erstellt.

    stockStats = new StockStats();
  4. Implementieren der folgenden Methoden, die für die ShardRecordProcessor-Schnittstelle benötigt werden

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
So führen Sie den Konsumenten aus
  1. Führen Sie den unter Schritt 4: Produzent implementieren erstellten Produzenten aus, um simulierte Wertpapiertransaktionsdatensätze in den Stream zu schreiben.

  2. Stellen Sie sicher, dass der Zugriffsschlüssel und das geheime Schlüsselpaar, die vorher (beim Erstellen des IAM-Benutzers) abgerufen wurden, in der Datei ~/.aws/credentials gespeichert sind.

  3. Führen Sie die StockTradesProcessor-Klasse mit den folgenden Argumenten aus:

    StockTradesProcessor StockTradeStream us-west-2

    Beachten Sie, dass Sie die Region hier angeben müssen, wenn Sie den Stream in einer anderen Region als us-west-2 erstellt haben.

Nach einer Minute sollen Sie eine Ausgabe ähnlich der folgenden sehen, die anschließend einmal pro Minute aktualisiert wird:

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

Nächste Schritte

Schritt 6: (optional) Konsument erweitern