Exécution d'opérations de flux de données Kinesis élémentaires à l'aide de l'AWS CLI - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exécution d'opérations de flux de données Kinesis élémentaires à l'aide de l'AWS CLI

Cette section décrit l'utilisation de base d'un flux de données Kinesis à partir de la ligne de commande à l'aide de l'interface AWS CLI. Assurez-vous de bien connaître les concepts abordés dans Amazon Kinesis Data Streams : terminologie et concepts

Note

Une fois que vous avez créé un flux, votre compte se voit facturer des frais nominaux pour l'utilisation de Kinesis Data Streams, car Kinesis Data Streams n'est pas éligible à l'offre gratuite AWS. Lorsque vous avez terminé ce tutoriel, supprimez vos ressources AWS pour ne plus être facturé. Pour de plus amples informations, veuillez consulter Étape 4 : Nettoyage.

Étape 1 : Créer un flux

Votre première étape consiste à créer un flux de données et à vérifier qu'il a été créé avec succès. Utilisez la commande suivante pour créer un flux nommé « Foo » :

aws kinesis create-stream --stream-name Foo

Entrez ensuite la commande suivante pour vérifier le progression de la création du flux :

aws kinesis describe-stream-summary --stream-name Foo

Vous devez obtenir une sortie similaire à l'exemple suivant :

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Dans cet exemple, le flux a l'état CRÉATION, qui signifie qu'il n'est pas tout à fait prêt à être utilisé. Vérifiez à nouveau après quelques instants. Vous devez voir une sortie similaire à l'exemple suivant :

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Cette sortie contient des informations dont vous n'avez pas besoin de vous occuper pour ce didacticiel. L'essentiel pour l'instant est "StreamStatus": "ACTIVE", qui vous indique que le flux est prêt à être utilisé, et les informations que vous avez demandées sur la partition unique. Vous pouvez également vérifier l'existence de votre nouveau flux en utilisant la commande list-streams, comme il est illustré ici :

aws kinesis list-streams

Sortie :

{ "StreamNames": [ "Foo" ] }

Étape 2 : Placer un enregistrement

Maintenant que vous avez un flux actif, vous êtes prêt à placer des données. Pour ce didacticiel, vous allez utiliser la commande la plus simple possible, soit put-record, qui place un enregistrement de données spécifique contenant le texte « testdata » dans le flux :

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

Cette commande, si elle aboutit, génère une sortie similaire à l'exemple suivant :

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

Félicitations, vous venez d'ajouter des données à un flux ! Vous allez ensuite voir comment extraire des données d'un flux.

Étape 3 : Extraction de l'enregistrement

GetShardIterator

Pour extraire des données du flux, vous devez obtenir l'itérateur de partition pour la partition qui vous intéresse. L'itérateur de partition représente la position du flux et de la partition à partir de laquelle l'application consommateur (dans ce cas, la commande get-record) lit. Vous allez utiliser la commande get-shard-iterator comme suit :

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

N'oubliez pas que les commandes aws kinesis reposent sur une API Kinesis Data Streams. Ainsi, si vous souhaitez connaître les paramètres indiqués, consultez la rubrique Référence d'API GetShardIterator. Si la commande s'exécute correctement, elle génère une sortie similaire à l'exemple suivant (faites défiler horizontalement pour voir la sortie en son entier) :

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

Cette longue chaîne de caractères apparemment aléatoires correspond à l'itérateur de partition (la vôtre sera différente). Vous devrez copier/coller l'itérateur de partition dans la commande get illustrée ci-après. Les itérateurs de partition ont une durée de vie valide de 300 secondes, qui doit vous suffire pour en copier/coller un dans la commande suivante. Notez que vous devez supprimer les sauts de ligne de l'itérateur de partition avant de le coller dans la commande suivante. Si vous obtenez un message d'erreur indiquant que l'itérateur de partition n'est plus valide, exécutez à nouveau la commande get-shard-iterator.

GetRecords

La commande get-records extrait les données du flux. Elle est résolue en un appel vers GetRecords dans l'API Kinesis Data Streams. L'itérateur de partition spécifie la position de la partition à partir de laquelle vous souhaitez démarrer les enregistrements de données de façon séquentielle. Si aucun enregistrement n'est disponible dans la partie de la partition vers laquelle pointe l'itérateur, GetRecords renvoie une liste vide. Notez que plusieurs appels peuvent être nécessaire pour parvenir à une portion de la partition qui contient les enregistrements.

Dans l'exemple suivant de la commande get-records (faites défiler horizontalement pour voir la commande en son entier) :

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

Si vous exécutez ce didacticiel à partir d'un interpréteur de commandes de type Unix tel que bash, vous pouvez automatiser l'acquisition de l'itérateur de partition à l'aide d'une commande imbriquée comme la commande suivante (faites défiler horizontalement pour voir la commande en entier) :

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

Si vous exécutez ce didacticiel depuis un système qui prend en charge PowerShell, vous pouvez automatiser l'acquisition de l'itérateur de partition à l'aide d'une commande telle que la suivante (faites défiler horizontalement pour voir la commande en entier) :

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

Si la commande get-records aboutit, il est demandé des enregistrements contenus dans votre flux dans la partition que vous avez spécifiée lorsque vous avez obtenu l'itérateur de partition, comme dans l'exemple suivant (faite défiler horizontalement pour voir la sortie en son entier) :

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

Notez que la commande get-records est décrite ci-dessus comme étant une requête, ce qui signifie que vous pouvez recevoir zéro enregistrement ou plus même si votre flux contient des enregistrements et que les enregistrements renvoyés peuvent ne pas représenter tous les enregistrements contenus actuellement dans votre flux. C'est parfaitement normal. Le code de production interroge simplement le flux à une fréquence appropriée pour trouver des enregistrements (la fréquence d'interrogation varie suivant les exigences spécifiques liées à la conception de l'application).

Dans cette partie du tutoriel, la première chose que vous constaterez probablement est que les données ressemblent à des données parasites, et non au texte en clair testdata que nous avons envoyé. Cela provient de la manière dont put-record utilise l'encodage en Base64 pour vous permettre d'envoyer des données binaires. Toutefois, la prise en charge de Kinesis Data Streams dans l'interface AWS CLI ne fournit pas le décodage en Base64, car le décodage Base64 en contenu binaire brut imprimé sur stdout peut engendrer un comportement indésirable et des problèmes de sécurité potentiels sur certaines plateformes et certains terminaux. Si vous utilisez un décodeur Base64 (par exemple, https://www.base64decode.org/) pour décoder manuellement dGVzdGRhdGE=, vous verrez que le texte correspond en fait à testdata. Cela suffit pour ce didacticiel, car, en pratique, l'AWS CLI sert rarement à utiliser des données, mais plus souvent à surveiller l'état du flux et à obtenir des informations, comme il a été illustré précédemment (describe-stream et list-streams). Les tutoriels futurs vous montreront comment créer des applications consommateur de qualité au niveau de la production à l'aide de la bibliothèque client (KCL), qui s'occupe de Base64 à votre place. Pour plus d'informations sur KCL, consultez Développement de consommateurs personnalisés avec un débit partagé (français non garanti) à l'aide de KCL.

get-records ne renvoie pas toujours tous les enregistrements appartenant au flux/à la partition spécifiée. Lorsque cela arrive, utilisez le NextShardIterator à partir du dernier résultat pour obtenir l'ensemble d'enregistrements suivant. Ainsi, si davantage de données sont placées dans le flux (ce qui est normal dans les applications de production), vous pouvez continuer à interroger les données chaque fois avec get-records. Toutefois, si vous n'appelez pas get-records avec l'itérateur de partition suivant dans le délai de durée de vie de 300 secondes de l'itérateur de partition, vous recevez un message d'erreur et vous devez utiliser la commande get-shard-iterator pour obtenir un nouvel itérateur de partition.

Cette sortie contient aussi MillisBehindLatest, qui est le nombre de millisecondes écoulées, pour la réponse de l'opération GetRecords à partir de l'extrémité du flux, et qui indique le retard que subit l'application consommateur. Une valeur égale à zéro indique que le traitement des enregistrements est terminé et qu'il ne reste plus d'enregistrements à traiter pour le moment. Dans ce didacticiel, vous pouvez voir un nombre très grand si vous prenez le temps de lire le texte. Ce n'est pas un problème, par défaut, les enregistrements de données demeurent dans un flux pendant 24 heures en attendant que vous les récupériez. Ce laps de temps s'appelle la période de conservation et peut être défini sur une valeur maximale de 365 jours.

Notez qu'un résultat get-records positif est toujours accompagné d'un NextShardIterator même si le flux ne contient actuellement plus d'enregistrements. Il s'agit d'un modèle d'interrogation qui suppose qu'un producteur met potentiellement plus d'enregistrements dans le flux à un moment donné. Bien que vous puissiez écrire vos propres routines d'interrogation, si vous utilisez la KCL mentionnée précédemment pour développer des applications consommateur, cette routine d'interrogation s'occupe de l'opération à votre place.

Si vous appelez get-records jusqu'à ce que le flux et la partition à partir desquels vous effectuez l'extraction ne contiennent plus d'enregistrements, vous verrez une sortie avec des enregistrements vides similaire à l'exemple suivant (faites défiler pour voir la sortie en son entier) :

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

Étape 4 : Nettoyage

Enfin, vous voudrez peut-être supprimer votre flux pour libérer de l'espace pour des ressources et éviter des frais de compte inutiles, comme mentionné précédemment. Effectuez cette opération de suppression chaque fois que vous avez créé un flux et que vous n'allez pas l'utiliser, car des frais sont facturés pour chaque flux même si vous y placez ou n'en extrayez aucune donnée. La commande de nettoyage est simple :

aws kinesis delete-stream --stream-name Foo

Si elle aboutit, aucune sortie n'est générée. Aussi vous voudrez peut-être utiliser la commande describe-stream pour consulter l'état de progression de la suppression :

aws kinesis describe-stream-summary --stream-name Foo

Si vous exécutez cette commande immédiatement après la commande de suppression, vous verrez probablement une sortie similaire à l'exemple suivant :

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

Une fois que le flux est entièrement supprimé, la commande describe-stream génère une erreur de type « introuvable » :

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.