Tutoriel : Exécutez des opérations Kinesis Data Streams de base à l'aide du AWS CLI - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Tutoriel : Exécutez des opérations Kinesis Data Streams de base à l'aide du AWS CLI

Cette section décrit l'utilisation de base d'un flux de données Kinesis à partir de la ligne de commande à l'aide de l'interface AWS CLI. Assurez-vous de bien connaître les concepts abordés dans Terminologie et concepts relatifs à Amazon Kinesis Data Streams

Note

Une fois que vous avez créé un stream, votre compte est soumis à des frais minimes pour l'utilisation de Kinesis Data Streams, car Kinesis Data Streams n'est pas éligible au AWS niveau gratuit. Lorsque vous aurez terminé ce didacticiel, supprimez vos AWS ressources pour ne plus encourir de frais. Pour plus d’informations, consultez Étape 4 : Nettoyer.

Étape 1 : créer un stream

Votre première étape consiste à créer un flux de données et à vérifier qu'il a été créé avec succès. Utilisez la commande suivante pour créer un flux nommé « Foo » :

aws kinesis create-stream --stream-name Foo

Entrez ensuite la commande suivante pour vérifier le progression de la création du flux :

aws kinesis describe-stream-summary --stream-name Foo

Vous devez obtenir une sortie similaire à l'exemple suivant :

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Dans cet exemple, le flux possède un statutCREATING, ce qui signifie qu'il n'est pas encore prêt à être utilisé. Vérifiez à nouveau après quelques instants. Vous devez voir une sortie similaire à l'exemple suivant :

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Cette sortie contient des informations dont vous n'avez pas besoin pour ce didacticiel. Les informations importantes pour le moment sont "StreamStatus": "ACTIVE" les suivantes : elles indiquent que le flux est prêt à être utilisé, ainsi que les informations sur le fragment que vous avez demandé. Vous pouvez également vérifier l'existence de votre nouveau flux en utilisant la commande list-streams, comme il est illustré ici :

aws kinesis list-streams

Sortie :

{ "StreamNames": [ "Foo" ] }

Étape 2 : Insérer un enregistrement

Maintenant que vous avez un flux actif, vous êtes prêt à placer des données. Pour ce didacticiel, vous allez utiliser la commande la plus simple possible, soit put-record, qui place un enregistrement de données spécifique contenant le texte « testdata » dans le flux :

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

Cette commande, si elle aboutit, génère une sortie similaire à l'exemple suivant :

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

Félicitations, vous venez d'ajouter des données à un flux ! Vous allez ensuite voir comment extraire des données d'un flux.

Étape 3 : Obtenir le dossier

GetShardIterator

Avant de pouvoir obtenir des données à partir du flux, vous devez obtenir l'itérateur de partition correspondant à la partition qui vous intéresse. L'itérateur de partition représente la position du flux et de la partition à partir de laquelle l'application consommateur (dans ce cas, la commande get-record) lit. Vous allez utiliser la get-shard-iterator commande comme suit :

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

N'oubliez pas que les aws kinesis commandes sont associées à Kinesis Data API Streams. Si vous êtes curieux de connaître l'un des paramètres affichés, vous pouvez en savoir plus sur eux dans GetShardIteratorAPIla rubrique de référence. Une exécution réussie produira un résultat similaire à l'exemple suivant :

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

Cette longue chaîne de caractères apparemment aléatoires correspond à l'itérateur de partition (la vôtre sera différente). Vous devez copier/coller l'itérateur de partition dans la commande get, illustrée ci-dessous. Les itérateurs de partition ont une durée de vie valide de 300 secondes, qui doit vous suffire pour en copier/coller un dans la commande suivante. Vous devez supprimer toutes les nouvelles lignes de votre itérateur de partition avant de les coller à la commande suivante. Si vous recevez un message d'erreur indiquant que l'itérateur de partition n'est plus valide, réexécutez la get-shard-iterator commande.

GetRecords

La get-records commande extrait les données du flux et se transforme GetRecordsen un appel vers Kinesis Data API Streams. L'itérateur de partition spécifie la position de la partition à partir de laquelle vous souhaitez démarrer les enregistrements de données de façon séquentielle. Si aucun enregistrement n'est disponible dans la partie de la partition vers laquelle pointe l'itérateur, GetRecords renvoie une liste vide. Plusieurs appels peuvent être nécessaires pour accéder à une partie de la partition contenant des enregistrements.

Dans l'exemple de get-records commande suivant :

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

Si vous exécutez ce didacticiel à partir d'un processeur de commande de type Unix tel que bash, vous pouvez automatiser l'acquisition de l'itérateur de partition à l'aide d'une commande imbriquée, comme celle-ci :

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

Si vous exécutez ce didacticiel à partir d'un système compatible PowerShell, vous pouvez automatiser l'acquisition de l'itérateur de partition à l'aide d'une commande telle que celle-ci :

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

Si la get-records commande aboutit, des enregistrements seront demandés dans votre flux pour la partition que vous avez spécifiée lorsque vous avez obtenu l'itérateur de partition, comme dans l'exemple suivant :

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

Notez que cela get-records est décrit ci-dessus comme une demande, ce qui signifie que vous pouvez recevoir zéro enregistrement ou plus même s'il y en a dans votre stream. Les enregistrements renvoyés peuvent ne pas représenter tous les enregistrements actuellement présents dans votre flux. C'est normal, et le code de production interrogera le flux à la recherche d'enregistrements à des intervalles appropriés. Cette vitesse d'interrogation varie en fonction des exigences spécifiques de conception de votre application.

Dans votre enregistrement dans cette partie du didacticiel, vous remarquerez que les données semblent être nulles, et ce n'est pas le texte clair que testdata nous avons envoyé. Cela provient de la manière dont put-record utilise l'encodage en Base64 pour vous permettre d'envoyer des données binaires. Cependant, la prise en charge de Kinesis Data Streams ne permet pas AWS CLI le décodage Base64, car le décodage Base64 en contenu binaire brut imprimé sur stdout peut entraîner des comportements indésirables et des problèmes de sécurité potentiels sur certaines plateformes et terminaux. Si vous utilisez un décodeur Base64 (par exemple, https://www.base64decode.org/) pour décoder manuellement dGVzdGRhdGE=, vous verrez que le texte correspond en fait à testdata. Cela est suffisant pour les besoins de ce didacticiel car, dans la pratique, le AWS CLI est rarement utilisé pour consommer des données. Le plus souvent, il est utilisé pour surveiller l'état du flux et obtenir des informations, comme indiqué précédemment (describe-streametlist-streams). Pour plus d'informations à ce sujetKCL, consultez la section Développement de consommateurs personnalisés avec un débit partagé en utilisant KCL.

get-recordsne renvoie pas toujours tous les enregistrements du stream/shard spécifié. Lorsque cela arrive, utilisez le NextShardIterator à partir du dernier résultat pour obtenir l'ensemble d'enregistrements suivant. Si davantage de données étaient introduites dans le flux, ce qui est normal dans les applications de production, vous pourriez continuer à interroger les données utilisées à get-records chaque fois. Toutefois, si vous n'appelez pas get-records en utilisant l'itérateur de partition suivant pendant la durée de vie de 300 secondes de l'itérateur de partition, vous recevrez un message d'erreur et vous devrez utiliser la get-shard-iterator commande pour obtenir un nouvel itérateur de partition.

Cette sortie indique également le nombre de millisecondes pendant lesquelles la réponse de l'GetRecordsopération se situe MillisBehindLatest à partir de la fin du flux, indiquant à quel point le consommateur est en retard sur l'heure actuelle. Une valeur égale à zéro indique que le traitement des enregistrements est terminé et qu'il ne reste plus d'enregistrements à traiter pour le moment. Dans ce didacticiel, vous pouvez voir un nombre très grand si vous prenez le temps de lire le texte. Par défaut, les enregistrements de données restent dans un flux pendant 24 heures en attendant que vous les récupériez. Ce laps de temps s'appelle la période de conservation et peut être défini sur une valeur maximale de 365 jours.

Un get-records résultat positif aura toujours un effet NextShardIterator même s'il n'y a plus d'enregistrement actuellement dans le flux. Il s'agit d'un modèle d'interrogation qui suppose qu'un producteur met potentiellement plus d'enregistrements dans le flux à un moment donné. Bien que vous puissiez écrire vos propres routines de sondage, si vous utilisez les méthodes mentionnées précédemment KCL pour développer des applications grand public, ce sondage est pris en charge pour vous.

Si vous appelez get-records jusqu'à ce qu'il n'y ait plus d'enregistrements dans le flux et la partition que vous extrayez, vous verrez un résultat contenant des enregistrements vides, comme dans l'exemple suivant :

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

Étape 4 : Nettoyer

Supprimez votre stream pour libérer des ressources et éviter des frais involontaires sur votre compte. Faites-le chaque fois que vous avez créé un flux et que vous n'allez pas l'utiliser, car des frais s'accumulent par flux, que vous y mettiez et que vous obteniez des données avec celui-ci ou non. La commande de nettoyage est la suivante :

aws kinesis delete-stream --stream-name Foo

Le succès n'entraîne aucune sortie. describe-streamÀ utiliser pour vérifier la progression de la suppression :

aws kinesis describe-stream-summary --stream-name Foo

Si vous exécutez cette commande immédiatement après la commande de suppression, vous obtiendrez un résultat similaire à l'exemple suivant :

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

Une fois que le flux est entièrement supprimé, la commande describe-stream génère une erreur de type « introuvable » :

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.