Implementieren Sie den Hersteller - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Implementieren Sie den Hersteller

Dieses Tutorial verwendet das reale Szenario einer Überwachung des Wertpapierhandels. Die folgenden Prinzipien erläutern kurz, wie dieses Szenario zum Produzenten und seiner unterstützenden Codestruktur passt.

Beachten Sie den Quellcode und prüfen Sie die folgenden Informationen.

StockTrade Klasse

Ein einzelner Aktienhandel wird durch eine Instanz der StockTrade Klasse repräsentiert. Diese Instance enthält folgende Attribute: Tickersymbol, Preis, Anzahl der Anteile, Art des Handels (Kauf oder Verkauf) und ID zur eindeutigen Identifizierung der Handelsaktion. Dieser Klasse wird für Sie implementiert.

Stream-Datensatz

Ein Stream ist eine Sequenz von Datensätzen. Ein Datensatz ist eine Serialisierung einer StockTrade Instanz im JSON Format. Beispielsweise:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator Klasse

StockTradeGenerator hat eine Methode namensgetRandomTrade(), die bei jedem Aufruf einen neuen zufällig generierten Aktienhandel zurückgibt. Dieser Klasse wird für Sie implementiert.

StockTradesWriter Klasse

Die main Methode des Herstellers ruft StockTradesWriter kontinuierlich einen zufälligen Trade ab und sendet ihn dann an Kinesis Data Streams, indem sie die folgenden Aufgaben ausführt:

  1. Liest den Namen des Datenstreams und den Namen der Region als Eingabe.

  2. Verwendet dieKinesisAsyncClientBuilder, um die Region, die Anmeldeinformationen und die Client-Konfiguration festzulegen.

  3. Sicherstellen, dass der Stream vorhanden und aktiv ist (wenn nicht, kommt es zu einer Beendigung mit Fehler).

  4. Aufrufen der StockTradeGenerator.getRandomTrade()-Methode in einer Dauerschleife und anschließend Aufruf der sendStockTrade-Methode, um die Handelsdaten alle 100 Millisekunden an den Stream zu senden.

Die sendStockTrade-Methode der StockTradesWriter-Klasse hat den folgenden Code:

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

Beachten Sie die folgende Code-Struktur:

  • The PutRecord API erwartet ein Byte-Array, und Sie müssen den Handel in JSON ein Format konvertieren. Diese einzelne Codezeile führt die Operation aus:

    byte[] bytes = trade.toJsonAsBytes();
  • Bevor Sie die Transaktion senden können, erstellen Sie eine neue PutRecordRequest-Instance (in diesem Fall Anforderung genannt). Jede request benötigt den Namen des Streams, einen Partitionsschlüssel und einen Daten-Blob.

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    Das Beispiel verwendet einen Börsenticker als Partitionsschlüssel, der den Datensatz einem bestimmten Shard zuordnet. In der Praxis sollten Sie Hunderte oder gar Tausende von Partitionsschlüsseln pro Shard haben, sodass die Datensätze in Ihrem Stream gleichmäßig verteilt sind. Weitere Informationen zum Hinzufügen von Daten zu einem Stream finden Sie unter Daten in Amazon Kinesis Data Streams schreiben.

    request kann die Daten jetzt an den Client senden (PUT-Operation):

    kinesisClient.putRecord(request).get();
  • Eine Fehlerüberprüfung und Protokollierung sind immer nützliche Ergänzungen. Dieser Code protokolliert Fehlerbedingungen:

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Platzieren Sie den try/catch-Block um die put-Operation herum:

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    Der Grund besteht darin, dass eine Kinesis Data Streams-PUT-Operation aufgrund eines Netzwerkfehlers fehlschlagen kann oder gedrosselt wird, weil die Durchsatzgrenze des Streams erreicht wird. Es wird empfohlen, dass Sie Ihre Wiederholungsrichtlinie für put Operationen zur Vermeidung von Datenverlusten, wie z. B. die Verwendung eines Wiederholungsversuchs, sorgfältig prüfen.

  • Eine Statusprotokollierung ist hilfreich, wenn auch optional:

    LOG.info("Putting trade: " + trade.toString());

Der hier gezeigte Hersteller verwendet die API Einzeldatensatzfunktion von Kinesis Data Streams,PutRecord. In der Praxis ist es oft effizienter, die Eignung von PutRecords für mehrere Datensätze zu nutzen und mehrere Datensatzstapel gleichzeitig zu senden, wenn ein Produzent viele Datensätze erstellt. Weitere Informationen finden Sie unter Daten in Amazon Kinesis Data Streams schreiben.

So führen Sie den Produzenten aus
  1. Verifizieren Sie, dass der in Erstellen Sie eine IAM Richtlinie und einen Benutzer abgerufene Zugriffsschlüssel samt geheimem Schlüsselpaar in der Datei ~/.aws/credentials gespeichert wurde.

  2. Führen Sie die StockTradeWriter-Klasse mit den folgenden Argumenten aus:

    StockTradeStream us-west-2

    Wenn Sie den Stream in einer anderen Region als us-west-2 erstellt haben, müssen Sie stattdessen hier diese Region angeben.

Die Ausgabe sollte folgendermaßen oder ähnlich aussehen:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Ihre Wertpapierdaten werden nun von Kinesis Data Streams eingespeist.

Nächste Schritte

Implementieren Sie den Verbraucher