Tutorial: Führen Sie grundlegende Kinesis Data Streams Streams-Operationen mit dem AWS CLI - Amazon-Kinesis-Data-Streams

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Tutorial: Führen Sie grundlegende Kinesis Data Streams Streams-Operationen mit dem AWS CLI

In diesem Abschnitt wird die grundlegende Verwendung eines Kinesis-Datenstroms über die Befehlszeile AWS CLI beschrieben. Stellen Sie sicher, dass Sie mit den unter Terminologie und Konzepte von Amazon Kinesis Data Streams behandelten Konzepten vertraut sind.

Anmerkung

Nachdem Sie einen Stream erstellt haben, fallen für Ihr Konto geringe Gebühren für die Nutzung von Kinesis Data Streams an, da Kinesis Data Streams nicht für das AWS kostenlose Kontingent in Frage kommt. Wenn Sie mit diesem Tutorial fertig sind, löschen Sie Ihre AWS Ressourcen, damit keine Gebühren mehr anfallen. Weitere Informationen finden Sie unter Schritt 4: Bereinigen.

Schritt 1: Erstellen Sie einen Stream

Der erste Schritt besteht darin, einen Stream zu erstellen und sicherzustellen, dass dieser erfolgreich erstellt wurde. Verwenden Sie den folgenden Befehl zum Erstellen eines Streams mit dem Namen „Foo”.

aws kinesis create-stream --stream-name Foo

Als Nächstes führen Sie den folgenden Befehl aus, um die Erstellung des Streams zu überprüfen:

aws kinesis describe-stream-summary --stream-name Foo

Sie sollten eine Ausgabe ähnlich dem folgenden Beispiel erhalten:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

In diesem Beispiel hat der Stream einen StatusCREATING, was bedeutet, dass er noch nicht einsatzbereit ist. Prüfen Sie den Status nach wenigen Minuten erneut. Jetzt sollten Sie eine Ausgabe ähnlich dem folgenden Beispiel erhalten:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Diese Ausgabe enthält Informationen, die Sie für dieses Tutorial nicht benötigen. Die wichtigsten Informationen für den Moment sind"StreamStatus": "ACTIVE", dass Sie wissen, dass der Stream bereit ist, verwendet zu werden, und die Informationen zu dem einzelnen Shard, den Sie angefordert haben. Sie können die Existenz Ihres neuen Streams auch mit dem Befehl list-streams überprüfen, wie in der folgenden Abbildung dargestellt:

aws kinesis list-streams

Ausgabe:

{ "StreamNames": [ "Foo" ] }

Schritt 2: Einen Datensatz erstellen

Jetzt, da Sie über einen aktiven Stream verfügen, können Sie Daten an diesen senden. In diesem Tutorial verwenden Sie einen möglichst einfachen Befehl, put-record. Dieser sendet einen einzelnen Datensatz mit dem Text „testdata” in den Stream.

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

Ist der Befehl erfolgreich, wird eine Ausgabe zurückgegeben, die wie folgt aussehen sollte:

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

Herzlichen Glückwunsch, Sie haben gerade Daten zu einem Stream hinzugefügt! Als Nächstes erfahren Sie, wie Sie Daten aus dem Stream abrufen können.

Schritt 3: Holen Sie sich den Datensatz

GetShardIterator

Bevor Sie Daten aus dem Stream abrufen können, müssen Sie den Shard-Iterator für den Shard abrufen, an dem Sie interessiert sind. Ein Shard-Iterator stellt die Position des Streams und des Shards dar, aus denen der Konsument (in diesem Fall der Befehl get-record) Daten ausliest. Sie verwenden den get-shard-iterator Befehl wie folgt:

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

Denken Sie daran, dass API hinter den aws kinesis Befehlen ein Kinesis Data Streams steckt. Wenn Sie sich also für einen der angezeigten Parameter interessieren, können Sie im GetShardIteratorAPIReferenzthema mehr darüber lesen. Eine erfolgreiche Ausführung führt zu einer Ausgabe, die dem folgenden Beispiel ähnelt:

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

Die lange Zeichenfolge scheinbar zufälliger Zeichen ist der Shard-Iterator (Ihrer lautet jedoch anders). Sie müssen den Shard-Iterator kopieren/in den Befehl get einfügen, der als Nächstes gezeigt wird. Shard-Iteratoren verfügen über eine Gültigkeitsdauer von 300 Sekunden. Dies sollte ausreichen, um den Shard-Iterator zu kopieren und in den nächsten Befehl einzufügen. Sie müssen alle Zeilenumbrüche aus Ihrem Shard-Iterator entfernen, bevor Sie sie in den nächsten Befehl einfügen. Wenn Sie eine Fehlermeldung erhalten, dass der Shard-Iterator nicht mehr gültig ist, führen Sie den Befehl erneut aus. get-shard-iterator

GetRecords

Der get-records Befehl ruft Daten aus dem Stream ab und wird in einem Aufruf von GetRecordsin den Kinesis Data Streams aufgelöst. API Der Shard-Iterator gibt die Position im Shard an, ab der Datensätze sequenziell ausgelesen werden sollen: Wenn keine Datensätze in dem Teil des Shards verfügbar sind, auf die der Iterator zeigt, gibt GetRecords eine leere Liste zurück. Möglicherweise sind mehrere Aufrufe erforderlich, um zu einem Teil des Shards zu gelangen, der Datensätze enthält.

Im folgenden Beispiel für den get-records Befehl:

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

Wenn Sie dieses Tutorial von einem UNIX-Befehlsprozessor wie der Bash aus ausführen, können Sie die Erfassung des Shard-Iterators mit einem verschachtelten Befehl wie dem Folgenden automatisieren:

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

Wenn Sie dieses Tutorial von einem System aus ausführen, das dies unterstützt PowerShell, können Sie die Erfassung des Shard-Iterators mit einem Befehl wie dem folgenden automatisieren:

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

Das erfolgreiche Ergebnis des get-records Befehls fordert Datensätze aus Ihrem Stream für den Shard an, den Sie beim Abrufen des Shard-Iterators angegeben haben, wie im folgenden Beispiel:

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

Beachten Sie, dass get-records dies oben als Anfrage beschrieben wurde, was bedeutet, dass Sie möglicherweise keine oder mehr Datensätze erhalten, selbst wenn Ihr Stream Datensätze enthält. Alle zurückgegebenen Datensätze stellen möglicherweise nicht alle Datensätze dar, die sich derzeit in Ihrem Stream befinden. Das ist normal, und der Produktionscode fragt den Stream in angemessenen Intervallen nach Datensätzen ab. Diese Abfragegeschwindigkeit hängt von Ihren spezifischen Anforderungen an das Anwendungsdesign ab.

In Ihren Aufzeichnungen in diesem Teil des Tutorials werden Sie feststellen, dass die Daten unbrauchbar zu sein scheinen — und es ist nicht der Klartext, den testdata wir gesendet haben. Dies liegt an der Art und Weise, auf die put-record Base64-Codierung verwendet, um Ihnen das Senden von Binärdaten zu ermöglichen. Die Kinesis Data Streams Streams-Unterstützung in bietet jedoch AWS CLI keine Base64-Decodierung, da die Base64-Decodierung von rohen Binärinhalten, die auf stdout gedruckt werden, auf bestimmten Plattformen und Terminals zu unerwünschtem Verhalten und potenziellen Sicherheitsproblemen führen kann. Wenn Sie einen Base64-Decoder verwenden (z. B. https://www.base64decode.org/), um dGVzdGRhdGE= manuell zu decodieren, sehen Sie, dass es tatsächlich testdata ist. Dies ist für dieses Tutorial ausreichend, da der in der Praxis nur selten zur Nutzung von Daten verwendet wird. AWS CLI Häufiger wird es verwendet, um den Status des Streams zu überwachen und Informationen abzurufen, wie zuvor gezeigt (describe-streamundlist-streams). Weitere Informationen zu finden Sie unter Entwickeln benutzerdefinierter Verbraucher mit gemeinsamem Durchsatz unter Verwendung von KCL. KCL

get-recordsgibt nicht immer alle Datensätze im angegebenen Stream/Shard zurück. Verwenden Sie in diesem Fall den NextShardIterator aus dem letzten Ergebnis, um die nächste Datensatzgruppe abzurufen. Wenn mehr Daten in den Stream eingegeben würden, was bei Produktionsanwendungen der Normalfall ist, könnten Sie jedes Mal nach nutzbaren Daten suchen. get-records Wenn Sie jedoch innerhalb der Lebensdauer von 300 Sekunden nicht get-records mit dem nächsten Shard-Iterator aufrufen, erhalten Sie eine Fehlermeldung, und Sie müssen den get-shard-iterator Befehl verwenden, um einen neuen Shard-Iterator zu erhalten.

In dieser Ausgabe wird auch angegebenMillisBehindLatest, wie viele Millisekunden die Antwort der GetRecordsOperation von der Spitze des Streams ausgeht, was angibt, wie weit der Verbraucher hinter der aktuellen Zeit zurückliegt. Der Wert Null gibt an, dass die Datenverarbeitung aktuell ist und dass zurzeit keine neuen zu verarbeitenden Datensätze vorhanden sind. In diesem Tutorial sehen Sie möglicherweise eine recht große Zeit, wenn Sie sich die Zeit dafür genommen haben, stets mitzulesen. Standardmäßig verbleiben Datensätze 24 Stunden lang in einem Stream und warten darauf, dass Sie sie abrufen. Dieser Zeitraum wird als Aufbewahrungszeitraum bezeichnet und ist bis zu 365 Tage konfigurierbar.

Ein erfolgreiches get-records Ergebnis wird immer angezeigt, NextShardIterator auch wenn sich derzeit keine weiteren Datensätze im Stream befinden. Hierbei handelt es sich um ein Abrufmodell, bei dem davon ausgegangen wird, dass ein Produzent zu einem bestimmten Zeitpunkt möglicherweise mehr Datensätze in den Stream einfügt. Sie können zwar Ihre eigenen Abfrageroutinen schreiben, aber wenn Sie die zuvor genannten KCL für die Entwicklung von Verbraucheranwendungen verwenden, wird diese Abfrage für Sie erledigt.

Wenn Sie aufrufen, get-records bis der Stream und der Shard, aus dem Sie abrufen, keine Datensätze mehr enthalten, erhalten Sie eine Ausgabe mit leeren Datensätzen, die dem folgenden Beispiel ähnelt:

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

Schritt 4: Bereinigen

Lösche deinen Stream, um Ressourcen freizugeben und ungewollte Gebühren für dein Konto zu vermeiden. Tun Sie dies jedes Mal, wenn Sie einen Stream erstellt haben und ihn nicht verwenden werden, da Gebühren pro Stream anfallen, unabhängig davon, ob Sie Daten mit ihm übertragen und abrufen oder nicht. Der Befehl zum Aufräumen lautet wie folgt:

aws kinesis delete-stream --stream-name Foo

Erfolg führt zu keiner Ausgabe. Verwenden Siedescribe-stream, um den Fortschritt des Löschvorgangs zu überprüfen:

aws kinesis describe-stream-summary --stream-name Foo

Wenn Sie diesen Befehl unmittelbar nach dem Löschbefehl ausführen, erhalten Sie eine Ausgabe, die dem folgenden Beispiel ähnelt:

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

Nachdem der Stream vollständig gelöscht wurde, führt describe-stream zum Fehler „nicht gefunden”.

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.