Étape 4 : Implémenter le producteur - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Étape 4 : Implémenter le producteur

L'application dans le Didacticiel : Traitement en temps réel des données boursières avec KPL et KCL 1.x utilise le scénario réel de surveillance des opérations boursières. Les principes suivants expliquent brièvement comment ce scénario est mise en correspondance avec la structure de code de l'application producteur et celle qui est associée.

Reportez-vous au code source et vérifiez les informations suivantes.

Classe StockTrade

Une opération boursière individuelle est représentée par une instance de la classe StockTrade. Cette instance contient des attributs tels que le symbole boursier, le prix, le nombre d'actions, le type de l'opération (achat ou vente) et un ID identifiant l'opération de manière unique. Cette classe est implémentée pour vous.

Enregistrement de flux

Un flux est une séquence d'enregistrements. Un enregistrement est une sérialisation d'une instance StockTrade au format JSON. Par exemple :

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
Classe StockTradeGenerator

StockTradeGenerator a une méthode appelée getRandomTrade() qui renvoie une nouvelle opération boursière générée de façon aléatoire chaque fois qu'elle est appelée. Cette classe est implémentée pour vous.

Classe StockTradesWriter

La méthode main de l'application producteur, StockTradesWriter extrait en permanence une opération boursière aléatoire et l'envoie à Kinesis Data Streams en effectuant les tâches suivantes :

  1. Elle lit le nom du flux et le nom de la région comme entrée.

  2. Elle crée un AmazonKinesisClientBuilder.

  3. Elle utilise le générateur client pour définir la région, les informations d'identification et la configuration du client.

  4. Elle génère un client AmazonKinesis à l'aide du générateur client.

  5. Elle vérifie que le flux existe et qu'il est actif (sinon il se ferme et génère une erreur).

  6. Dans une boucle continue, elle appelle la méthode StockTradeGenerator.getRandomTrade(), puis la méthode sendStockTrade pour envoyer l'opération boursière au flux toutes les 100 millisecondes.

La méthode sendStockTrade de la classe StockTradesWriter comprend le code suivant :

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

Reportez-vous à la ventilation de code suivante :

  • L'API PutRecord attend un tableau d'octets, et vous devez convertir trade au format JSON. La seule ligne de code suivante effectue cette opération :

    byte[] bytes = trade.toJsonAsBytes();
  • Avant de pouvoir envoyer l'opération boursière, vous créez une nouvelle instance PutRecordRequest (appelée putRecord dans le cas suivant) :

    PutRecordRequest putRecord = new PutRecordRequest();

    Chaque appel de PutRecord requiert le nom du flux, la clé de partition et le blob de données. Le code suivant remplit ces champs dans l'objet putRecord à l'aide de ses méthodes setXxxx() :

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    L'exemple utilise un symbole boursier comme clé de partition qui mappe l'enregistrement à une partition spécifique. En pratique, vous devez avoir des centaines ou des milliers de clés de partition par partition afin de répartir les enregistrements de façon égale dans votre flux. Pour plus d'informations sur la façon d'ajouter des données à un flux, consultez la pageAjout de données à un flux.

    Maintenant putRecord est prêt à envoyer au client (l'opération put) :

    kinesisClient.putRecord(putRecord);
  • La vérification des erreurs et leur consignation sont toujours très utiles. Le code suivant consigne les conditions d'erreur :

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    Ajoutez le bloc try/catch autour de l'opération put :

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    En effet, une opération put de Kinesis Data Streams peut échouer en raison d'une erreur de réseau ou parce que le flux atteint ses limites de débit et est limité. Nous vous recommandons de réfléchir soigneusement à votre stratégie de tentative pour les opérations put afin d'éviter une perte de données, par exemple d'effectuer une nouvelle tentative simple.

  • La journalisation d'état est très utile, mais elle est facultative :

    LOG.info("Putting trade: " + trade.toString());

L'application producteur présentée ici utilise la fonctionnalité d'enregistrement unique de l'API Kinesis Data Streams, PutRecord. En pratique, si une application producteur génère de nombreux enregistrements, il est souvent plus efficace d'utiliser la fonctionnalité Plusieurs enregistrements de PutRecords et d'envoyer les lots d'enregistrements en même temps. Pour de plus amples informations, veuillez consulter Ajout de données à un flux.

Pour exécuter l'application producteur
  1. Vérifiez que la paire de clé d'accès et de clé secrète extraites précédemment (lors de la création de l'utilisateur IAM) sont enregistrées dans le fichier ~/.aws/credentials.

  2. Exécutez la classe StockTradeWriter avec les arguments suivants :

    StockTradeStream us-west-2

    Si vous avez créé votre flux dans une région autre que us-west-2, vous devez spécifier cette région ici.

Vous devez voir des résultats similaires à ce qui suit :

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Votre flux d'opérations boursières est maintenant en cours d'ingestion par Kinesis Data Streams.

Étapes suivantes

Étape 5 : Implémenter le consommateur