Développez un client de bibliothèque cliente Kinesis dans Node.js - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Développez un client de bibliothèque cliente Kinesis dans Node.js

Vous pouvez utiliser la bibliothèque cliente Kinesis (KCL) pour créer des applications qui traitent les données issues de vos flux de données Kinesis. La KCL est disponible en plusieurs langues. Cette rubrique présente Node.js.

KCLIl s'agit d'une bibliothèque Java ; la prise en charge de langages autres que Java est fournie à l'aide d'une interface multilingue appelée. MultiLangDaemon Ce démon est basé sur Java et s'exécute en arrière-plan lorsque vous utilisez un KCL langage autre que Java. Par conséquent, si vous installez le fichier KCL for Node.js et que vous écrivez votre application client entièrement dans Node.js, vous devez toujours installer Java sur votre système en raison du MultiLangDaemon. En outre, MultiLangDaemon il comporte certains paramètres par défaut que vous devrez peut-être personnaliser en fonction de votre cas d'utilisation, par exemple, la AWS région à laquelle il se connecte. Pour plus d'informations MultiLangDaemon sur l'activation GitHub, rendez-vous sur la page KCL MultiLangDaemon du projet.

Pour télécharger le fichier Node.js KCL depuis GitHub, accédez à la bibliothèque cliente Kinesis (Node.js).

Téléchargements des exemples de code

Deux exemples de code sont disponibles KCL dans Node.js :

  • basic-sample​​

    Utilisé dans les sections suivantes pour illustrer les principes fondamentaux de la création d'une application KCL grand public dans Node.js.

  • click-stream-sample

    Il est un peu plus avancé et se sert d'un scénario réel. A utiliser après vous être familiarisé avec l'exemple de code de base. Cet exemple n'est pas abordé ici mais contient un README fichier contenant plus d'informations.

Vous devez effectuer les tâches suivantes lors de l'implémentation d'une application KCL grand public dans Node.js :

Implémenter le processeur d'enregistrement

Le consommateur le plus simple possible utilisant KCL for Node.js doit implémenter une recordProcessor fonction, qui contient à son tour les fonctions initializeprocessRecords, etshutdown. L'exemple fournit une implémentation que vous pouvez utiliser comme point de départ (voir sample_kcl_app.js).

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
initialisation

La KCL initialize fonction est appelée lorsque le processeur d'enregistrement démarre. Ce processeur d'enregistrements traite uniquement l'ID de partition passé à initializeInput.shardId et, en règle générale, l'inverse est également vrai (cette partition est traitée uniquement par ce processeur d'enregistrements). Cependant, votre application consommateur doit prendre en compte la possibilité qu'un enregistrement de données peut être traité plusieurs fois. Cela provient du fait que Kinesis Data Streams a la sémantique au moins une fois, qui signifie que chaque enregistrement de données issu d'une partition est traité au moins une fois par une application de travail dans votre application consommateur. Pour plus d'informations sur les cas dans lesquels une partition spécifique peut éventuellement être traitée par plusieurs applications de travail, consultez la page Utilisez le redécoupage, le dimensionnement et le traitement parallèle pour modifier le nombre de partitions.

initialize: function(initializeInput, completeCallback)
processRecords

Il KCL appelle cette fonction avec une entrée contenant une liste d'enregistrements de données provenant de la partition spécifiée pour la initialize fonction. Le processeur d'enregistrements que vous implémentez traite les données figurant dans ces enregistrements suivant la sémantique de votre application consommateur. Par exemple, l'application de travail peut exécuter une transformation sur les données et stocker ensuite le résultat dans un compartiment Amazon Simple Storage Service (Amazon S3).

processRecords: function(processRecordsInput, completeCallback)

En plus des données elles-même, l'enregistrement contient également un numéro de séquence et une clé de partition, que l'application de travail peut utiliser pour traiter les données. Par exemple, l'application de travail peut choisir le compartiment S3 dans lequel stocker les données en fonction de la valeur de la clé de partition. Le dictionnaire record expose les paires clé-valeur suivantes pour accéder aux données, numéro de séquence et clé de partition de l'enregistrement :

record.data record.sequenceNumber record.partitionKey

Notez que les données sont encodées en Base64.

Dans l'exemple de base, la fonction processRecords contient du code qui indique comment une application de travail peut accéder aux données, numéro de séquence et clé de partition de l'enregistrement.

Kinesis Data Streams exige que le processeur d'enregistrements effectue le suivi des enregistrements qui ont déjà été traités dans une partition. KCLIl s'occupe de ce suivi avec un checkpointer objet transmis sous le nom deprocessRecordsInput.checkpointer. Votre processeur de fichiers appelle la checkpointer.checkpoint fonction pour indiquer dans KCL quelle mesure il a progressé dans le traitement des enregistrements de la partition. En cas d'échec du worker, il KCL utilise ces informations lorsque vous redémarrez le traitement de la partition afin qu'il continue à partir du dernier enregistrement traité connu.

Dans le cas d'une opération de division ou de fusion, le traitement des nouvelles partitions KCL ne commence pas tant que les processeurs des partitions d'origine n'ont pas appelé checkpoint pour signaler que le traitement des partitions d'origine est terminé.

Si vous ne transmettez pas le numéro de séquence à la checkpoint fonction, cela KCL suppose que l'appel à checkpoint signifie que tous les enregistrements ont été traités, jusqu'au dernier enregistrement transmis au processeur d'enregistrements. Par conséquent, le processeur d'enregistrements doit appeler checkpoint seulement après avoir traité tous les enregistrements de la liste qui lui a été passée. Les processeurs d'enregistrements n'ont pas besoin d'appeler checkpoint à chaque appel de processRecords. Un processeur peut, par exemple, appeler checkpoint tous les trois appels ou lors d'un événement externe au processeur d'enregistrements, tel qu'un service de vérification/validation personnalisé que vous avez implémenté.

Vous pouvez éventuellement spécifier le numéro de séquence précis d'un enregistrement comme paramètre à checkpoint. Dans ce cas, le KCL suppose que tous les enregistrements ont été traités jusqu'à cet enregistrement uniquement.

L'exemple d'application de base montre l'appel le plus simple possible de la fonction checkpointer.checkpoint. Vous pouvez ajouter à la fonction une autre logique de points de contrôle nécessaire pour votre application consommateur à ce stade.

shutdown

Il KCL appelle la shutdown fonction soit lorsque le traitement se termine (shutdownInput.reasonestTERMINATE), soit lorsque le travailleur ne répond plus (shutdownInput.reasonestZOMBIE).

shutdown: function(shutdownInput, completeCallback)

Le traitement se termine lorsque le processeur d'enregistrements ne reçoit plus d'enregistrements de la partition, car la partition a été fractionnée ou fusionnée, ou le flux a été supprimé.

KCLTransmet également un shutdownInput.checkpointer objet àshutdown. Si le motif de fermeture est TERMINATE, vous devez vous assurer que le processeur d'enregistrements a fini de traiter les enregistrements de données et appeler ensuite la fonction checkpoint sur cet objet.

Modifier les propriétés de configuration

L'exemple fournit les valeurs par défaut des propriétés de configuration. Vous pouvez remplacer ces propriétés par vos propres valeurs (voir sample.properties dans l'exemple de base).

Nom de l'application

Cela KCL nécessite une application unique parmi vos applications et parmi les tables Amazon DynamoDB d'une même région. Elle utilise la valeur de configuration du nom d'application des manières suivantes :

  • Tous les programmes d'exécution associés à ce nom d'application sont considérés comme rattachés au même flux. Ces programmes d'exécution peuvent être répartis sur plusieurs instances. Si vous exécutez une instance supplémentaire du même code d'application, mais avec un nom d'application différent, KCL la deuxième instance est traitée comme une application entièrement distincte qui fonctionne également sur le même flux.

  • KCLcrée une table DynamoDB avec le nom de l'application et utilise la table pour gérer les informations d'état (telles que les points de contrôle et le mappage worker-shard) de l'application. Chaque application a son propre tableau DynamoDB. Pour plus d’informations, consultez Utilisez un tableau des baux pour suivre les partitions traitées par l'application KCL client.

Configurer les informations d'identification

Vous devez mettre vos AWS informations d'identification à la disposition de l'un des fournisseurs d'informations d'identification de la chaîne de fournisseurs d'informations d'identification par défaut. Vous pouvez utiliser la propriété AWSCredentialsProvider pour définir un fournisseur d'informations d'identification. Le fichier sample.properties doit mettre vos informations d'identification à disposition de l'un des fournisseurs d'informations d'identification appartenant à la chaîne des fournisseurs d'informations d'identification par défaut. Si vous exécutez votre client sur une EC2 instance Amazon, nous vous recommandons de configurer l'instance avec un IAM rôle. AWS les informations d'identification qui reflètent les autorisations associées à ce IAM rôle sont mises à la disposition des applications de l'instance via ses métadonnées d'instance. Il s'agit de la méthode la plus sûre pour gérer les informations d'identification d'une application grand public exécutée sur une EC2 instance.

L'exemple suivant est configuré KCL pour traiter un flux de données Kinesis kclnodejssample nommé à l'aide du processeur d'enregistrement fourni dans : sample_kcl_app.js

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON