Mettre en œuvre le consommateur - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Mettre en œuvre le consommateur

L'application consommateur de ce didacticiel traite en continu les transactions boursières dans votre flux de données. Elle génère ensuite les actions les plus populaires achetées et vendues toutes les minutes. L'application est construite sur la base de la bibliothèque cliente Kinesis (KCL), qui effectue une grande partie du travail fastidieux propre aux applications grand public. Pour de plus amples informations, veuillez consulter Utiliser la bibliothèque cliente Kinesis.

Reportez-vous au code source et vérifiez les informations suivantes.

StockTradesProcessor classe

La classe principale du consommateur, fournie pour vous, qui exécute les tâches suivantes :

  • Lit les noms de l'application, du flux de données et des régions, transmis sous forme d'arguments.

  • Crée une KinesisAsyncClient instance avec le nom de la région.

  • Crée une instance StockTradeRecordProcessorFactory qui sert les instances de ShardRecordProcessor, implémentée par une instance StockTradeRecordProcessor.

  • Crée une ConfigsBuilder instance avec l'StockTradeRecordProcessorFactoryinstance KinesisAsyncClient StreamNameApplicationName,, et. Ceci est utile pour créer toutes les configurations avec des valeurs par défaut.

  • Crée un KCL planificateur (auparavant, dans KCL les versions 1.x, il était connu sous le nom de KCL worker) avec l'instance. ConfigsBuilder

  • Le planificateur crée un nouveau thread pour chaque partition (affectée à cette instance de consommateur), qui fonctionne en boucle pour lire des enregistrements dans le flux de données. Elle appelle alors l'instance StockTradeRecordProcessor pour traiter chaque lot d'enregistrements reçu.

StockTradeRecordProcessor classe

Implémentation de l'instance StockTradeRecordProcessor, qui implémente à son tour cinq méthodes requises : initialize, processRecords, leaseLost, shardEnded et shutdownRequested.

Les shutdownRequested méthodes initialize et sont utilisées par le processeur d'enregistrements KCL pour indiquer au processeur d'enregistrements quand il doit être prêt à commencer à recevoir des enregistrements et quand il doit s'attendre à arrêter de recevoir des enregistrements, respectivement, afin qu'il puisse effectuer toutes les tâches de configuration et de terminaison spécifiques à l'application. leaseLostet shardEnded sont utilisés pour implémenter toute logique indiquant ce qu'il faut faire lorsqu'un bail est perdu ou qu'un traitement a atteint la fin d'une partition. Dans cet exemple, nous enregistrons simplement les messages indiquant ces événements.

Le code de ces méthodes est fourni. Le traitement principal se déroule dans la méthode processRecords, qui utilise à son tour processRecord pour chaque enregistrement. Cette dernière méthode est fournie la plupart du temps sous forme de squelette de code vide à implémenter à l'étape suivante qui fournit des explications détaillées.

Notez également l'implémentation des méthodes d'assistance pour processRecord : reportStats et resetStats, qui sont vides dans le code source d'origine.

La méthode processRecords est implémentée pour vous et effectue les opérations suivantes :

  • Pour chaque enregistrement passé, il appelle processRecord dessus.

  • Si au moins 1 minute s'est écoulée depuis le dernier rapport, il appelle reportStats(), qui imprime les dernières statistiques, puis de resetStats(), qui efface les statistiques afin que l'intervalle suivant n'inclut que les nouveaux enregistrements.

  • Règle l'heure du rapport suivant.

  • Si au moins 1 minute s'est écoulée depuis le dernier point de contrôle, il appelle checkpoint().

  • Règle l'heure du point de contrôle suivante.

Cette méthode utilise des intervalles de 60 secondes pour la fréquence de création de rapports et de point de contrôle. Pour plus d'informations sur les points de contrôle, consultez la section Utilisation de Kinesis Client Library (français non garanti).

StockStats classe

Cette classe prend en charge la conservation des données et le suivi des statistiques dans le temps pour les actions les plus populaires. Ce code est fourni pour vous et contient les méthodes suivantes :

  • addStockTrade(StockTrade) : Injecte le StockTrade donné dans les statistiques en cours d'exécution.

  • toString() : renvoie les statistiques dans une chaîne formatée.

Cette classe suit les actions les plus populaires en comptabilisant le nombre total de transactions pour chaque action et le nombre maximum. Elle met à jour ces chiffres chaque fois qu'une opération boursière arrive.

Ajoutez du code aux méthodes de la classe StockTradeRecordProcessor, comme l'illustrent les étapes suivantes.

Pour implémenter l'application consommateur
  1. Implémentez la méthode processRecord en instanciant un objet StockTrade correctement dimensionné et en lui ajoutant les données d'enregistrement, puis en consignant un avertissement en cas de problème.

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. Implémentez une reportStats méthode. Modifiez le format de sortie en fonction de vos préférences.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. Implémentez la méthode resetStats, qui crée une nouvelle instance stockStats.

    stockStats = new StockStats();
  4. Implémentez les méthodes suivantes requises par ShardRecordProcessor l'interface :

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
Pour exécuter l'application consommateur
  1. Exécutez l'application producteur que vous avez écrite dans Implémenter le producteur pour injecter des enregistrements d'opérations boursières simulées dans votre flux.

  2. Vérifiez que la clé d'accès et la paire de clés secrètes récupérées précédemment (lors de la création de IAM l'utilisateur) sont enregistrées dans le fichier~/.aws/credentials.

  3. Exécutez la classe StockTradesProcessor avec les arguments suivants :

    StockTradesProcessor StockTradeStream us-west-2

    Notez que, si vous avez créé votre flux dans une région autre que us-west-2, vous devez spécifier cette région ici.

Au bout d'une minute, vous devez voir une sortie similaire à la suivante, actualisée toutes les minutes :

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

Étapes suivantes

(Facultatif) Étendre le consommateur