Mettre en œuvre le consommateur - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Mettre en œuvre le consommateur

L'application consommateur dans Tutoriel : Traitez les données boursières en temps réel à l'aide KPL de et KCL 1.x traite en continue le flux d'opérations boursières que vous avez créé dans Implémenter le producteur. Elle génère ensuite les actions les plus populaires achetées et vendues toutes les minutes. L'application est construite sur la base de la bibliothèque cliente Kinesis (KCL), qui effectue une grande partie du travail fastidieux propre aux applications grand public. Pour de plus amples informations, veuillez consulter Développez des KCL consommateurs 1.x.

Reportez-vous au code source et vérifiez les informations suivantes.

StockTradesProcessor classe

Classe principale de l'application consommateur, qui est fournie pour vous et effectue les tâches suivantes :

  • Lit les noms d'application, de flux et de région transmis comme arguments.

  • Lit les informations d'identification à partir de ~/.aws/credentials.

  • Crée une instance RecordProcessorFactory qui sert les instances de RecordProcessor, implémentée par une instance StockTradeRecordProcessor.

  • Crée un KCL worker avec l'RecordProcessorFactoryinstance et une configuration standard comprenant le nom du flux, les informations d'identification et le nom de l'application.

  • L'application de travail crée un nouveau thread pour chaque partition (affectée à cette instance de consommateur), qui fonctionne en boucle pour lire des enregistrements à partir de Kinesis Data Streams. Elle appelle alors l'instance RecordProcessor pour traiter chaque lot d'enregistrements reçu.

StockTradeRecordProcessor classe

Implémentation de l'instance RecordProcessor, qui implémente à son tour trois méthodes requises : initialize, processRecords et shutdown.

Comme leur nom l'indique, les méthodes initialize et shutdown sont utilisées par la Kinesis Client Library pour informer le processeur d'enregistrements du moment où il doit être prêt à recevoir des enregistrements et où il doit s'attendre à arrêter de recevoir des enregistrements afin d'effectuer des tâches d'installation et de mise hors service propres à l'application. Le code correspondant vous est fourni. Le traitement principal se déroule dans la méthode processRecords, qui utilise à son tour processRecord pour chaque enregistrement. La méthode processRecord est fournie la plupart du temps sous forme de squelette de code vide à implémenter à l'étape suivante qui fournit des explications détaillées.

Notez également l'implémentation des méthodes d'assistance pour processRecord : reportStats et resetStats, qui sont vides dans le code source d'origine.

La méthode processRecords est implémentée pour vous et effectue les opérations suivantes :

  • Pour chaque enregistrement passé, il appelle processRecord dessus.

  • Si au moins 1 minute s'est écoulée depuis le dernier rapport, il appelle reportStats(), qui imprime les dernières statistiques, puis de resetStats(), qui efface les statistiques afin que l'intervalle suivant n'inclut que les nouveaux enregistrements.

  • Règle l'heure du rapport suivant.

  • Si au moins 1 minute s'est écoulée depuis le dernier point de contrôle, il appelle checkpoint().

  • Règle l'heure du point de contrôle suivante.

Cette méthode utilise des intervalles de 60 secondes pour la fréquence de création de rapports et de point de contrôle. Pour en savoir plus sur les points de contrôle, consultez Informations supplémentaires sur le consommateur.

StockStats classe

Cette classe prend en charge la conservation des données et le suivi des statistiques dans le temps pour les actions les plus populaires. Ce code est fourni pour vous et contient les méthodes suivantes :

  • addStockTrade(StockTrade) : Injecte le StockTrade donné dans les statistiques en cours d'exécution.

  • toString() : renvoie les statistiques dans une chaîne formatée.

Cette classe suit les actions les plus populaires en comptabilisant le nombre total de transactions pour chaque action et le nombre maximum. Elle met à jour ces chiffres chaque fois qu'une opération boursière arrive.

Ajoutez du code aux méthodes de la classe StockTradeRecordProcessor, comme l'illustrent les étapes suivantes.

Pour implémenter l'application consommateur
  1. Implémentez la méthode processRecord en instanciant un objet StockTrade correctement dimensionné et en lui ajoutant les données d'enregistrement, puis en consignant un avertissement en cas de problème.

    StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
  2. Implémentez une méthode reportStats simple. N'hésitez pas à modifier le format de sortie selon vos préférences.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. Pour finir, implémentez la méthode resetStats, qui crée une nouvelle instance stockStats.

    stockStats = new StockStats();
Pour exécuter l'application consommateur
  1. Exécutez l'application producteur que vous avez écrite dans Implémenter le producteur pour injecter des enregistrements d'opérations boursières simulées dans votre flux.

  2. Vérifiez que la clé d'accès et la paire de clés secrètes récupérées précédemment (lors de la création de IAM l'utilisateur) sont enregistrées dans le fichier~/.aws/credentials.

  3. Exécutez la classe StockTradesProcessor avec les arguments suivants :

    StockTradesProcessor StockTradeStream us-west-2

    Notez que, si vous avez créé votre flux dans une région autre que us-west-2, vous devez spécifier cette région ici.

Au bout d'une minute, vous devez voir une sortie similaire à la suivante, actualisée toutes les minutes :

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

Informations supplémentaires sur le consommateur

Si vous connaissez bien les avantages de la Kinesis Client Library, présentés dans Développez des KCL consommateurs 1.x et ailleurs, vous vous demandez peut-être pourquoi vous devez les utiliser ici. Bien que vous n'utilisiez qu'un seul flux de partition et une seule instance de consommateur pour le traiter, il est toujours plus facile d'implémenter le consommateur à l'aide duKCL. Comparez les étapes d'implémentation du code dans l'application producteur à celles de l'application consommateur pour voir combien il est plus facile d'implémenter une application consommateur. Cela est dû en grande partie aux services qu'ils KCL fournissent.

Dans cette application, vous vous concentrez sur l'implémentation d'une classe de processeur d'enregistrements de processeur qui peut traiter des enregistrements individuels. Vous n'avez pas à vous soucier de la manière dont les enregistrements sont extraits de Kinesis Data Streams ; KCL le système récupère les enregistrements et appelle le processeur d'enregistrements chaque fois que de nouveaux enregistrements sont disponibles. En outre, vous n'avez pas à vous soucier du nombre d'instances de partitions et de consommateurs qui existent. Si le flux est mis à l'échelle, vous n'avez pas à réécrire votre application afin de gérer plus d'une partition ou une instance de consommateurs.

Le terme point de contrôle signifie enregistrer le point dans le flux jusqu'aux enregistrements de données qui ont été consommés et traités jusqu'à présent. Si l'application plante, le flux est lu à partir de ce point et non depuis le début du flux. L'objet de contrôle, les différents modèles de conception et les meilleures pratiques pour à ce sujet sont exclus de ce chapitre. Cependant, il probable que vous rencontriez ces éléments dans les environnement de production.

Comme vous l'avez appris dansImplémenter le producteur, les put opérations dans les Kinesis Data API Streams utilisent une clé de partition comme entrée. Kinesis Data Streams utilise une clé de partition comme mécanisme pour fractionner les enregistrements sur plusieurs partitions (lorsqu'il y a plusieurs partitions dans le flux). La même clé de partition achemine toujours vers la même partition. Cela permet à l'application consommateur qui traite une partition spécifique d'être conçue en supposant que les enregistrements ayant la même clé de partition ne sont envoyés qu'à cette application consommateur, et qu'aucun enregistrement ayant cette même clé de partition ne parvient à une autre application consommateur. Par conséquent, une application de travail d'une application consommateur peut regrouper tous les enregistrements ayant la même clé de partition sans se soucier de savoir s'il manque des données nécessaires.

Dans cette application, le traitement des enregistrements par le consommateur n'est pas intensif. Vous pouvez donc utiliser une seule partition et effectuer le traitement dans le même fil que le KCL fil. Toutefois, il convient en pratique de prendre en considération la mise à l'échelle du nombre de partitions. Dans certains cas, vous pouvez basculer le traitement vers un thread différent ou utiliser un pool de threads si le traitement des enregistrements est prévu être intensif.0 De cette façon, ils KCL peuvent récupérer de nouveaux enregistrements plus rapidement tandis que les autres threads peuvent traiter les enregistrements en parallèle. La conception multithread n'est pas banale et doit être abordée à l'aide de techniques avancées. L'augmentation du nombre de partitions est donc généralement le moyen le plus efficace d'augmenter le nombre de partitions.

Étapes suivantes

(Facultatif) Étendre le consommateur