Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Mettre en œuvre le consommateur
L'application consommateur de ce didacticiel traite en continu les transactions boursières dans votre flux de données. Elle génère ensuite les actions les plus populaires achetées et vendues toutes les minutes. L'application est construite sur la base de la bibliothèque cliente Kinesis (KCL), qui effectue une grande partie du travail fastidieux propre aux applications grand public. Pour de plus amples informations, veuillez consulter Utiliser la bibliothèque cliente Kinesis.
Reportez-vous au code source et vérifiez les informations suivantes.
- StockTradesProcessor classe
-
La classe principale du consommateur, fournie pour vous, qui exécute les tâches suivantes :
-
Lit les noms de l'application, du flux de données et des régions, transmis sous forme d'arguments.
-
Crée une
KinesisAsyncClient
instance avec le nom de la région. -
Crée une instance
StockTradeRecordProcessorFactory
qui sert les instances deShardRecordProcessor
, implémentée par une instanceStockTradeRecordProcessor
. -
Crée une
ConfigsBuilder
instance avec l'StockTradeRecordProcessorFactory
instanceKinesisAsyncClient
StreamName
ApplicationName
,, et. Ceci est utile pour créer toutes les configurations avec des valeurs par défaut. -
Crée un KCL planificateur (auparavant, dans KCL les versions 1.x, il était connu sous le nom de KCL worker) avec l'instance.
ConfigsBuilder
-
Le planificateur crée un nouveau thread pour chaque partition (affectée à cette instance de consommateur), qui fonctionne en boucle pour lire des enregistrements dans le flux de données. Elle appelle alors l'instance
StockTradeRecordProcessor
pour traiter chaque lot d'enregistrements reçu.
-
- StockTradeRecordProcessor classe
-
Implémentation de l'instance
StockTradeRecordProcessor
, qui implémente à son tour cinq méthodes requises :initialize
,processRecords
,leaseLost
,shardEnded
etshutdownRequested
.Les
shutdownRequested
méthodesinitialize
et sont utilisées par le processeur d'enregistrements KCL pour indiquer au processeur d'enregistrements quand il doit être prêt à commencer à recevoir des enregistrements et quand il doit s'attendre à arrêter de recevoir des enregistrements, respectivement, afin qu'il puisse effectuer toutes les tâches de configuration et de terminaison spécifiques à l'application.leaseLost
etshardEnded
sont utilisés pour implémenter toute logique indiquant ce qu'il faut faire lorsqu'un bail est perdu ou qu'un traitement a atteint la fin d'une partition. Dans cet exemple, nous enregistrons simplement les messages indiquant ces événements.Le code de ces méthodes est fourni. Le traitement principal se déroule dans la méthode
processRecords
, qui utilise à son tourprocessRecord
pour chaque enregistrement. Cette dernière méthode est fournie la plupart du temps sous forme de squelette de code vide à implémenter à l'étape suivante qui fournit des explications détaillées.Notez également l'implémentation des méthodes d'assistance pour
processRecord
:reportStats
etresetStats
, qui sont vides dans le code source d'origine.La méthode
processRecords
est implémentée pour vous et effectue les opérations suivantes :-
Pour chaque enregistrement passé, il appelle
processRecord
dessus. -
Si au moins 1 minute s'est écoulée depuis le dernier rapport, il appelle
reportStats()
, qui imprime les dernières statistiques, puis deresetStats()
, qui efface les statistiques afin que l'intervalle suivant n'inclut que les nouveaux enregistrements. -
Règle l'heure du rapport suivant.
-
Si au moins 1 minute s'est écoulée depuis le dernier point de contrôle, il appelle
checkpoint()
. -
Règle l'heure du point de contrôle suivante.
Cette méthode utilise des intervalles de 60 secondes pour la fréquence de création de rapports et de point de contrôle. Pour plus d'informations sur les points de contrôle, consultez la section Utilisation de Kinesis Client Library (français non garanti).
-
- StockStats classe
-
Cette classe prend en charge la conservation des données et le suivi des statistiques dans le temps pour les actions les plus populaires. Ce code est fourni pour vous et contient les méthodes suivantes :
-
addStockTrade(StockTrade)
: Injecte leStockTrade
donné dans les statistiques en cours d'exécution. -
toString()
: renvoie les statistiques dans une chaîne formatée.
Cette classe suit les actions les plus populaires en comptabilisant le nombre total de transactions pour chaque action et le nombre maximum. Elle met à jour ces chiffres chaque fois qu'une opération boursière arrive.
-
Ajoutez du code aux méthodes de la classe StockTradeRecordProcessor
, comme l'illustrent les étapes suivantes.
Pour implémenter l'application consommateur
-
Implémentez la méthode
processRecord
en instanciant un objetStockTrade
correctement dimensionné et en lui ajoutant les données d'enregistrement, puis en consignant un avertissement en cas de problème.byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
-
Implémentez une
reportStats
méthode. Modifiez le format de sortie en fonction de vos préférences.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
Implémentez la méthode
resetStats
, qui crée une nouvelle instancestockStats
.stockStats = new StockStats();
-
Implémentez les méthodes suivantes requises par
ShardRecordProcessor
l'interface :@Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
Pour exécuter l'application consommateur
-
Exécutez l'application producteur que vous avez écrite dans Implémenter le producteur pour injecter des enregistrements d'opérations boursières simulées dans votre flux.
-
Vérifiez que la clé d'accès et la paire de clés secrètes récupérées précédemment (lors de la création de IAM l'utilisateur) sont enregistrées dans le fichier
~/.aws/credentials
. -
Exécutez la classe
StockTradesProcessor
avec les arguments suivants :StockTradesProcessor StockTradeStream us-west-2
Notez que, si vous avez créé votre flux dans une région autre que
us-west-2
, vous devez spécifier cette région ici.
Au bout d'une minute, vous devez voir une sortie similaire à la suivante, actualisée toutes les minutes :
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
Étapes suivantes
(Facultatif) Étendre le consommateur