Schritt 4: Produzent implementieren - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Schritt 4: Produzent implementieren

Die Anwendung im Tutorial: Verarbeiten von Wertpapier-Echtzeitdaten mit KPL und KCL 1.x verwendet das reale Szenario einer Überwachung des Wertpapierhandels. Im Folgenden wird kurz erläutert, wie dieses Szenario zum Produzenten und der unterstützenden Codestruktur zugeordnet wird.

Überprüfen Sie die folgenden Informationen in Bezug auf den Quellcode.

StockTrade-Klasse

Ein bestimmter Wertpapierhandel wird durch eine Instance der StockTrade-Klasse dargestellt. Diese Instance enthält folgende Attribute: Tickersymbol, Preis, Anzahl der Anteile, Art des Handels (Kauf oder Verkauf) und ID zur eindeutigen Identifizierung der Handelsaktion. Dieser Klasse wird für Sie implementiert.

Stream-Datensatz

Ein Stream ist eine Sequenz von Datensätzen. Ein Datensatz ist die Serialisierung einer StockTrade-Instance im JSON-Format. Beispiele:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator-Klasse

StockTradeGenerator verfügt über eine Methode namens getRandomTrade(), die bei Aufruf Daten eines zufällig generierten Wertpapierhandels zurückgibt. Dieser Klasse wird für Sie implementiert.

StockTradesWriter-Klasse

Die main-Methode des Produzenten, StockTradesWriter, ruft kontinuierlich eine zufällige Handelsaktion ab und sendet die Daten an Kinesis Data Streams, indem sie die folgenden Aufgaben durchführt:

  1. Lesen des Stream- und Regionsnamen als Eingabe.

  2. Erstellen eines AmazonKinesisClientBuilder.

  3. Verwenden des Client Builder, um die Region, die Anmeldeinformationen und die Client-Konfiguration festzulegen.

  4. Erstellen eines AmazonKinesis-Clients mit dem Client-Builder.

  5. Sicherstellen, dass der Stream vorhanden und aktiv ist (wenn nicht, kommt es zu einer Beendigung mit Fehler).

  6. Aufrufen der StockTradeGenerator.getRandomTrade()-Methode in einer Dauerschleife und anschließend Aufruf der sendStockTrade-Methode, um die Handelsdaten alle 100 Millisekunden an den Stream zu senden.

Die sendStockTrade-Methode der StockTradesWriter-Klasse hat den folgenden Code:

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

Beachten Sie die folgende Code-Struktur:

  • Die PutRecord-API erwartet ein Byte-Array und Sie müssen trade in ein JSON-Format umwandeln. Diese einzelne Codezeile führt die Operation aus:

    byte[] bytes = trade.toJsonAsBytes();
  • Bevor Sie die Handelsdaten senden können, erstellen Sie eine neue PutRecordRequest-Instance (namens putRecord in diesem Fall):

    PutRecordRequest putRecord = new PutRecordRequest();

    Jeder PutRecord-Aufruf erfordert den Namen des Streams, einen Partitionsschlüssel und einen Daten-Blob. Der folgende Code füllt diese Felder im putRecord-Objekt mit dessen setXxxx()-Methoden:

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    Im Beispiel wird ein Stock Ticket als Partitionsschlüssel verwendet, wodurch der Datensatz einem bestimmten Shard zugeordnet wird. In der Praxis sollten Sie Hunderte oder gar Tausende von Partitionsschlüsseln pro Shard haben, sodass die Datensätze in Ihrem Stream gleichmäßig verteilt sind. Weitere Informationen zum Hinzufügen von Daten zu einem Stream finden Sie unter Hinzufügen von Daten zu einem Stream.

    putRecord ist nun für das Senden an den Client bereit (put-Operation):

    kinesisClient.putRecord(putRecord);
  • Eine Fehlerüberprüfung und Protokollierung sind immer nützliche Ergänzungen. Dieser Code protokolliert Fehlerbedingungen:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Platzieren Sie den try/catch-Block um die put-Operation herum:

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    Der Grund besteht darin, dass eine Kinesis-Data-Streams-put-Operation aufgrund eines Netzwerkfehlers fehlschlagen kann oder gedrosselt wird, weil die Durchsatzgrenze des Streams erreicht wird. Sie sollten sich Ihre Wiederholungsrichtlinie für put---Operationen sorgfältig überlegen, um Datenverluste zu vermeiden, beispielsweise durch eine einfache Wiederholung.

  • Eine Statusprotokollierung ist hilfreich, wenn auch optional:

    LOG.info("Putting trade: " + trade.toString());

Der hier gezeigte Produzent verwendet die API-Funktionalität von Kinesis Data Streams für einzelne Datensätze PutRecord. In der Praxis ist es oft effizienter, die Eignung von PutRecords für mehrere Datensätze zu nutzen und mehrere Datensatzstapel gleichzeitig zu senden, wenn ein Produzent viele Datensätze erstellt. Weitere Informationen finden Sie unter Hinzufügen von Daten zu einem Stream.

So führen Sie den Produzenten aus
  1. Stellen Sie sicher, dass der Zugriffsschlüssel und das geheime Schlüsselpaar, die vorher (beim Erstellen des IAM-Benutzers) abgerufen wurden, in der Datei ~/.aws/credentials gespeichert sind.

  2. Führen Sie die StockTradeWriter-Klasse mit den folgenden Argumenten aus:

    StockTradeStream us-west-2

    Wenn Sie Ihren Stream in einer anderen Region als us-west-2 erstellt haben, müssen Sie stattdiesen hier diese Region angeben.

Die Ausgabe sollte folgendermaßen oder ähnlich aussehen:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Ihr Stream für die Wertpapierdaten wird nun von Kinesis Data Streams eingespeist.

Nächste Schritte

Schritt 5: Konsument implementieren