Développement d'une application consommateur de la bibliothèque client Kinesis en .NET - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Développement d'une application consommateur de la bibliothèque client Kinesis en .NET

Vous pouvez utiliser la bibliothèque client Kinesis (KCL) dans le développement d'applications capables de traiter les données de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente .NET.

La KCL est une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. MultiLangDaemon Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un langage KCL autre que Java. Par conséquent, si vous installez la KCL pour .NET et que vous écrivez votre application grand public entièrement en .NET, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page du MultiLangDaemon projet KCL.

Pour télécharger le .NET KCL depuis GitHub, accédez à la bibliothèque cliente Kinesis (.NET). Pour télécharger un exemple de code pour une application client .NET KCL, rendez-vous sur la page du projet client d'exemple KCL pour .NET sur. GitHub

Vous devez effectuer les tâches suivantes lorsque vous implémentez une application consommateur KCL en .NET :

Implémenter les méthodes RecordProcessor de classe I

L'application consommateur doit implémenter les méthodes suivantes pour IRecordProcessor. L'exemple d'application consommateur fournit des implémentations que vous pouvez utiliser comme point de départ (voir la classe SampleRecordProcessor dans SampleConsumer/AmazonKinesisSampleConsumer.cs).

public void Initialize(InitializationInput input) public void ProcessRecords(ProcessRecordsInput input) public void Shutdown(ShutdownInput input)
Initialiser

La KCL appelle cette méthode lorsque le processeur d'enregistrements est instancié, en passant un ID de partition spécifique dans le paramètre input (input.ShardId). Ce processeur d'enregistrements traite uniquement cette partition et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Cela provient du fait que Kinesis Data Streams a la sémantique au moins une fois, qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut éventuellement être traitée par plusieurs applications de travail, consultez la page Repartitionnement, mise à l'échelle et traitement parallèle.

public void Initialize(InitializationInput input)
ProcessRecords

La KCL appelle cette méthode, en passant une liste d'enregistrements de données dans le paramètre input (input.Records), qui sont issues de la partition spécifiée par la méthode Initialize. Le processeur d'enregistrements que vous implémentez traite les données figurant dans ces enregistrements suivant la sémantique de votre application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3).

public void ProcessRecords(ProcessRecordsInput input)

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition. L'application de travail utilise ces valeurs lors du traitement des données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. La classe Record expose le code suivant pour accéder aux données, numéro de séquence et clé de partition de l'enregistrement :

byte[] Record.Data string Record.SequenceNumber string Record.PartitionKey

Dans l'exemple, la méthode ProcessRecordsWithRetries contient du code qui montre comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. La KCL assure ce suivi à votre place en passant un objet Checkpointer à ProcessRecords (input.Checkpointer). Le processeur d'enregistrements appelle la méthode Checkpointer.Checkpoint pour informer la KCL de son avancement dans le traitement des enregistrements de la partition. Si le travail échoue, la KCL utilise ces informations pour redémarrer le traitement de la partition au niveau du dernier enregistrement traité connu.

Dans le cas d'un fractionnement ou d'une fusion, la KCL ne commence pas à traiter les nouvelles partitions tant que les processeurs des partitions d'origine n'ont pas appelé Checkpointer.Checkpoint pour signaler que l'ensemble du traitement sur les partitions d'origine est terminé.

Si vous ne passez pas de paramètre, la KCL suppose que l'appel de Checkpointer.Checkpoint signifie que tous les enregistrements ont été traités jusqu'au dernier enregistrement qui a été passé au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler Checkpointer.Checkpoint seulement après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler Checkpointer.Checkpoint à chaque appel de ProcessRecords. Un processeur peut, par exemple, appeler Checkpointer.Checkpoint tous les trois ou quatre appels. Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à Checkpointer.Checkpoint. Dans ce cas, la KCL suppose que les enregistrements ont été traités seulement jusqu'à cet enregistrement.

Dans l'exemple, la méthode privée Checkpoint(Checkpointer checkpointer) montre comment appeler la méthode Checkpointer.Checkpoint en utilisant la logique appropriée de traitement des exceptions et de nouvelle tentative.

La KCL pour .NET gère les exceptions différemment des autres bibliothèques de langage KCL, car elle ne gère pas toutes les exceptions générées par le traitement des enregistrements de données. Toutes les exceptions non interceptées dans le code utilisateur bloquent le programme.

Fermeture

La KCL appelle la méthode Shutdown soit à la fin du traitement (le motif de fermeture étant TERMINATE) ou lorsque l'application de travail ne répond plus (la raison de fermeture input.Reason ayant la valeur ZOMBIE).

public void Shutdown(ShutdownInput input)

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

La KCL passe également un objet Checkpointer à shutdown. Si le motif de fermeture est TERMINATE, le processeur d'enregistrements doit terminer le traitement des enregistrements de données et appeler ensuite la méthode checkpoint sur cette interface.

Modifier les propriétés de configuration

L'exemple d'application consommateur fournit les valeurs par défaut des propriétés de configuration. Vous pouvez remplacer ces propriétés par vos propres valeurs (voir SampleConsumer/kcl.properties).

Nom de l'application

La KCL nécessite une d'application qui est unique parmi vos applications et parmi les tableaux Amazon DynamoDB dans la même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :

  • Tous les programmes d'exécution associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une autre instance du même code d'application, mais sous un autre nom d'application, la KCL traite cette seconde instance comme une application totalement distincte, associée elle aussi au même flux.

  • La KCL crée un tableau DynamoDB portant ce nom d'application et utilise la table pour tenir à jour les informations d'état (par exemple, les points de contrôle et le mappage d'application de travail-partition) pour l'application. Chaque application a son propre tableau DynamoDB. Pour plus d’informations, consultez Utilisation d'une table des baux pour suivre les partitions traitées par l'application consommateur KCL.

Configurer les informations d'identification

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Vous pouvez utiliser la propriété AWSCredentialsProvider pour définir un fournisseur d'informations d'identification. Le fichier sample.properties doit mettre vos informations d'identification à disposition de l'un des fournisseurs d'informations d'identification appartenant à la chaîne des fournisseurs d'informations d'identification par défaut. Si vous exécutez votre application client sur une instance Amazon EC2, nous vous recommandons de configurer l'instance avec un rôle IAM. Les informations d'identification AWS qui reflètent les autorisations associées à ce rôle IAM sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. C'est le moyen le plus sûr de gérer les informations d'identification pour une application consommateur exécutée sur une instance EC2.

Dans l'exemple, le fichier de propriétés configure la KCL pour traiter un flux de données Kinesis appelé « words » à l'aide du processeur d'enregistrements fourni dans AmazonKinesisSampleConsumer.cs.