Étape 4 : Implémenter le producteur - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Étape 4 : Implémenter le producteur

Ce didacticiel utilise le scénario réel de surveillance des opérations boursières. Les principes suivants expliquent brièvement comment ce scénario est mis en correspondance avec l’application producteur et la structure de code associée.

Reportez-vous au code source et vérifiez les informations suivantes.

Classe StockTrade

Une opération boursière individuelle est représentée par une instance de la classe StockTrade. Cette instance contient des attributs tels que le symbole boursier, le prix, le nombre d'actions, le type de l'opération (achat ou vente) et un ID identifiant l'opération de manière unique. Cette classe est implémentée pour vous.

Enregistrement de flux

Un flux est une séquence d'enregistrements. Un enregistrement est une sérialisation d'une instance StockTrade au format JSON. Par exemple :

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
Classe StockTradeGenerator

La Classe StockTradeGenerator possède une méthode appelée getRandomTrade() qui renvoie une nouvelle opération boursière générée de façon aléatoire chaque fois qu'elle est appelée. Cette classe est implémentée pour vous.

Classe StockTradesWriter

La méthode main de l'application producteur, StockTradesWriter extrait en permanence une opération boursière aléatoire et l'envoie à Kinesis Data Streams en effectuant les tâches suivantes :

  1. Elle lit le nom du flux et le nom de la région comme entrée.

  2. Elle utilise le KinesisAsyncClientBuilder pour définir la région, les informations d'identification et la configuration du client.

  3. Elle vérifie que le flux existe et qu'il est actif (sinon il se ferme et génère une erreur).

  4. Dans une boucle continue, elle appelle la méthode StockTradeGenerator.getRandomTrade(), puis la méthode sendStockTrade pour envoyer l'opération boursière au flux toutes les 100 millisecondes.

La méthode sendStockTrade de la classe StockTradesWriter comprend le code suivant :

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

Reportez-vous à la ventilation de code suivante :

  • L'API PutRecord attend un tableau d'octets, et vous devez convertir l’opération au format JSON. La seule ligne de code suivante effectue cette opération :

    byte[] bytes = trade.toJsonAsBytes();
  • Avant d’envoyer l'opération, vous créez une nouvelle instance PutRecordRequest (appelée request dans le cas suivant) : Chaque appel de request requiert le nom du flux, la clé de partition et un blob de données.

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    L'exemple utilise un symbole boursier comme clé de partition qui mappe l'enregistrement à une partition spécifique. En pratique, vous devez avoir des centaines ou des milliers de clés de partition par partition afin de répartir les enregistrements de façon égale dans votre flux. Pour plus d'informations sur la façon d'ajouter des données à un flux, consultez la pageÉcrire des données sur Amazon Kinesis Data Streams.

    Maintenant request est prêt à envoyer au client (l'opération put) :

    kinesisClient.putRecord(request).get();
  • La vérification des erreurs et leur consignation sont toujours très utiles. Le code suivant consigne les conditions d'erreur :

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Ajoutez le bloc try/catch autour de l'opération put :

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    Une opération put de Kinesis Data Streams peut échouer en raison d’une erreur réseau ou si le flux de données atteint ses limites de débit. Nous vous recommandons de réfléchir soigneusement à votre stratégie de tentative pour les opérations put afin d'éviter une perte de données, par exemple d'effectuer une nouvelle tentative simple.

  • La journalisation d'état est très utile, mais elle est facultative :

    LOG.info("Putting trade: " + trade.toString());

L'application producteur présentée ici utilise la fonctionnalité d'enregistrement unique de l'API Kinesis Data Streams, PutRecord. En pratique, si une application producteur génère de nombreux enregistrements, il est souvent plus efficace d'utiliser la fonctionnalité Plusieurs enregistrements de PutRecords et d'envoyer les lots d'enregistrements en même temps. Pour de plus amples informations, veuillez consulter Écrire des données sur Amazon Kinesis Data Streams.

Pour exécuter l'application producteur
  1. Vérifiez que la clé d'accès et la paire de clés secrètes récupérées dans Étape 2 : Création d'un utilisateur et d'une politique IAM sont enregistrées dans le fichier ~/.aws/credentials.

  2. Exécutez la classe StockTradeWriter avec les arguments suivants :

    StockTradeStream us-west-2

    Si vous avez créé votre flux dans une région autre que us-west-2, vous devez spécifier cette région ici.

Vous devez voir des résultats similaires à ce qui suit :

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Vos transactions boursières sont maintenant ingérées par Kinesis Data Streams.

Étapes suivantes

Étape 5 : Implémenter le consommateur