Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Étape 4 : Implémenter le producteur
Ce didacticiel utilise le scénario réel de surveillance des opérations boursières. Les principes suivants expliquent brièvement comment ce scénario est mis en correspondance avec l’application producteur et la structure de code associée.
Reportez-vous au code source
- Classe StockTrade
-
Une opération boursière individuelle est représentée par une instance de la classe StockTrade. Cette instance contient des attributs tels que le symbole boursier, le prix, le nombre d'actions, le type de l'opération (achat ou vente) et un ID identifiant l'opération de manière unique. Cette classe est implémentée pour vous.
- Enregistrement de flux
-
Un flux est une séquence d'enregistrements. Un enregistrement est une sérialisation d'une instance
StockTrade
au format JSON. Par exemple :{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
- Classe StockTradeGenerator
-
La Classe StockTradeGenerator possède une méthode appelée
getRandomTrade()
qui renvoie une nouvelle opération boursière générée de façon aléatoire chaque fois qu'elle est appelée. Cette classe est implémentée pour vous. - Classe StockTradesWriter
-
La méthode
main
de l'application producteur, StockTradesWriter extrait en permanence une opération boursière aléatoire et l'envoie à Kinesis Data Streams en effectuant les tâches suivantes :-
Elle lit le nom du flux et le nom de la région comme entrée.
-
Elle utilise le
KinesisAsyncClientBuilder
pour définir la région, les informations d'identification et la configuration du client. -
Elle vérifie que le flux existe et qu'il est actif (sinon il se ferme et génère une erreur).
-
Dans une boucle continue, elle appelle la méthode
StockTradeGenerator.getRandomTrade()
, puis la méthodesendStockTrade
pour envoyer l'opération boursière au flux toutes les 100 millisecondes.
La méthode
sendStockTrade
de la classeStockTradesWriter
comprend le code suivant :private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }
Reportez-vous à la ventilation de code suivante :
-
L'API
PutRecord
attend un tableau d'octets, et vous devez convertir l’opération au format JSON. La seule ligne de code suivante effectue cette opération :byte[] bytes = trade.toJsonAsBytes();
-
Avant d’envoyer l'opération, vous créez une nouvelle instance
PutRecordRequest
(appelée request dans le cas suivant) : Chaque appel derequest
requiert le nom du flux, la clé de partition et un blob de données.PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();
L'exemple utilise un symbole boursier comme clé de partition qui mappe l'enregistrement à une partition spécifique. En pratique, vous devez avoir des centaines ou des milliers de clés de partition par partition afin de répartir les enregistrements de façon égale dans votre flux. Pour plus d'informations sur la façon d'ajouter des données à un flux, consultez la pageÉcrire des données sur Amazon Kinesis Data Streams.
Maintenant
request
est prêt à envoyer au client (l'opération put) :kinesisClient.putRecord(request).get();
-
La vérification des erreurs et leur consignation sont toujours très utiles. Le code suivant consigne les conditions d'erreur :
if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }
Ajoutez le bloc try/catch autour de l'opération
put
:try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }
Une opération put de Kinesis Data Streams peut échouer en raison d’une erreur réseau ou si le flux de données atteint ses limites de débit. Nous vous recommandons de réfléchir soigneusement à votre stratégie de tentative pour les opérations
put
afin d'éviter une perte de données, par exemple d'effectuer une nouvelle tentative simple. -
La journalisation d'état est très utile, mais elle est facultative :
LOG.info("Putting trade: " + trade.toString());
L'application producteur présentée ici utilise la fonctionnalité d'enregistrement unique de l'API Kinesis Data Streams,
PutRecord
. En pratique, si une application producteur génère de nombreux enregistrements, il est souvent plus efficace d'utiliser la fonctionnalité Plusieurs enregistrements dePutRecords
et d'envoyer les lots d'enregistrements en même temps. Pour de plus amples informations, veuillez consulter Écrire des données sur Amazon Kinesis Data Streams. -
Pour exécuter l'application producteur
-
Vérifiez que la clé d'accès et la paire de clés secrètes récupérées dans Étape 2 : Création d'un utilisateur et d'une politique IAM sont enregistrées dans le fichier
~/.aws/credentials
. -
Exécutez la classe
StockTradeWriter
avec les arguments suivants :StockTradeStream us-west-2
Si vous avez créé votre flux dans une région autre que
us-west-2
, vous devez spécifier cette région ici.
Vous devez voir des résultats similaires à ce qui suit :
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18
Feb 16, 2015 3:53:00 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85
Feb 16, 2015 3:53:01 PM
com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade
INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08
Vos transactions boursières sont maintenant ingérées par Kinesis Data Streams.
Étapes suivantes
Étape 5 : Implémenter le consommateur