Développement d'une application consommateur de la bibliothèque client Kinesis en Python - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Développement d'une application consommateur de la bibliothèque client Kinesis en Python

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Python.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. MultiLangDaemon Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour Python et que vous écrivez votre application grand public entièrement en Python, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du MultiLangDaemon projet KCL.

Pour télécharger la KCL Python depuis GitHub, accédez à la bibliothèque cliente Kinesis (Python). Pour télécharger un exemple de code pour une application client KCL Python, rendez-vous sur la page d'exemple de projet KCL pour Python sur. GitHub

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL en Python :

Implémenter les méthodes RecordProcessor de classe

La classe RecordProcess doit étendre la classe RecordProcessorBase pour implémenter les méthodes ci-après. L'exemple fournit des implémentations que vous pouvez utiliser comme point de départ (voir sample_kclpy_app.py).

def initialize(self, shard_id) def process_records(self, records, checkpointer) def shutdown(self, checkpointer, reason)
initialisation

La KCL appelle la méthode initialize lorsque le processeur d'enregistrements est instancié, en passant un ID de partition spécifique comme paramètre. Ce processeur d'enregistrements traite uniquement cette partition et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Cela provient du fait que Kinesis Data Streams a la sémantique au moins une fois, qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut être traitée par plusieurs applications de travail, consultez la page Repartitionnement, mise à l'échelle et traitement parallèle.

def initialize(self, shard_id)
process_records

La KCL appelle cette méthode en passant une liste d'enregistrements de données issus de la partition spécifiée par la méthode initialize. Le processeur d'enregistrements que vous implémentez traite les données figurant dans ces enregistrements suivant la sémantique de votre application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3).

def process_records(self, records, checkpointer)

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition. L'application de travail utilise ces valeurs lors du traitement des données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. Le dictionnaire record expose les paires clé-valeur suivantes pour accéder aux données, numéro de séquence et clé de partition de l'enregistrement :

record.get('data') record.get('sequenceNumber') record.get('partitionKey')

Notez que les données sont encodées en Base64.

Dans l'exemple, la méthode process_records contient du code qui montre comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. La KCL assure ce suivi à votre place en passant un objet Checkpointer à process_records. Le processeur d'enregistrements appelle la méthode checkpoint sur cet objet pour informer la KCL de son avancement dans le traitement des enregistrements de la partition. Si le travail échoue, la KCL utilise ces informations pour redémarrer le traitement de la partition au niveau du dernier enregistrement traité connu.

Dans le cas d'un fractionnement ou d'une fusion, la KCL ne commence pas à traiter les nouvelles partitions tant que les processeurs des partitions d'origine n'ont pas appelé checkpoint pour signaler que l'ensemble du traitement sur les partitions d'origine est terminé.

Si vous ne passez pas de paramètre, la KCL suppose que l'appel de checkpoint signifie que tous les enregistrements ont été traités jusqu'au dernier enregistrement qui a été passé au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler checkpoint seulement après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler checkpoint à chaque appel de process_records. Un processeur peut, par exemple, appeler checkpoint tous les trois appels. Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à checkpoint. Dans ce cas, la KCL suppose que tous les enregistrements ont été traités jusqu'à cet enregistrement uniquement.

Dans l'exemple, la méthode privée checkpoint montre comment appeler la méthode Checkpointer.checkpoint en utilisant la logique appropriée de traitement des exceptions et de nouvelle tentative.

La KCL s'appuie sur process_records pour gérer toutes les exceptions générées par le traitement des enregistrements de données. Si une exception est déclenchée depuis process_records, la KCL ignore les enregistrements de données qui ont été transmis à process_records avant l'exception. En d'autres termes, ces enregistrements ne sont pas renvoyés au processeur d'enregistrements qui a lancé l'exception ou à tout autre processeur d'enregistrement dans l'application consommateur.

shutdown

La KCL appelle la méthode shutdown soit à la fin du traitement (le motif de fermeture étant TERMINATE) ou lorsque l'application de travail ne répond plus (la raison de fermeture reason ayant la valeur ZOMBIE).

def shutdown(self, checkpointer, reason)

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

La KCL passe également un objet Checkpointer à shutdown. Si le motif de fermeture reason est TERMINATE, le processeur d'enregistrements doit terminer le traitement des enregistrements de données et appeler ensuite la méthode checkpoint sur cette interface.

Modifier les propriétés de configuration

L'exemple fournit les valeurs par défaut des propriétés de configuration. Vous pouvez remplacer ces propriétés par vos propres valeurs (voir sample.properties).

Nom de l'application

La KCL nécessite un nom d'application qui est unique parmi vos applications et parmi les tableaux Amazon DynamoDB dans la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :

  • Tous les programmes d'exécution qui sont associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.

  • La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour plus d’informations, consultez Utilisation d'une table des baux pour suivre les partitions traitées par l'application consommateur KCL.

Configurer les informations d'identification

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Vous pouvez utiliser la propriété AWSCredentialsProvider pour définir un fournisseur d'informations d'identification. Le fichier sample.properties doit mettre vos informations d'identification à disposition de l'un des fournisseurs d'informations d'identification appartenant à la chaîne des fournisseurs d'informations d'identification par défaut. Si vous exécutez votre application client sur une instance Amazon EC2, nous vous recommandons de configurer l'instance avec un rôle IAM. Les informations d'identification AWS qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

Dans l'exemple, le fichier de propriétés configure la KCL pour traiter un flux de données Kinesis appelé « words » à l'aide du processeur d'enregistrements fourni dans sample_kclpy_app.py.