チュートリアル: を使用して基本的な Kinesis Data Streams オペレーションを実行する AWS CLI - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

チュートリアル: を使用して基本的な Kinesis Data Streams オペレーションを実行する AWS CLI

このセクションでは、 AWS CLIを使用した、コマンドラインからの Kinesis data stream の基本的な使用方法について説明します。Amazon Kinesis Data Streams の用語と概念で説明されている概念を理解している必要があります。

注記

ストリームを作成すると、Kinesis Data Streams は AWS 無料利用枠の対象外であるため、Kinesis Data Streams の使用に対して アカウントにわずかな料金が発生します。このチュートリアルが終了したら、 AWS リソースを削除して料金の発生を停止します。詳細については、「ステップ 4: クリーンアップする」を参照してください。

ステップ 1: ストリームを作成する

最初のステップは、ストリームを作成し、正常に作成されたことを確認することです。次のコマンドを使用して、Fooという名前のストリームを作成します。

aws kinesis create-stream --stream-name Foo

次に、次のコマンドを実行して、ストリーム作成の進行状況を確認します。

aws kinesis describe-stream-summary --stream-name Foo

次の例のような出力が得られます。

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

この例では、ストリームのステータスは です。つまりCREATING、まだ使用する準備ができていません。しばらくしてからもう一度調べると、次の例のような出力が表示されます。

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

この出力には、このチュートリアルで不要な情報が含まれています。現在の重要な情報は です。これは"StreamStatus": "ACTIVE"、ストリームを使用する準備ができていること、およびリクエストした単一のシャードに関する情報を示します。また、次に示すように、list-streams コマンドを使用して新しいストリームの存在を確認することもできます。

aws kinesis list-streams

出力:

{ "StreamNames": [ "Foo" ] }

ステップ 2: レコードを配置する

アクティブなストリームができたら、データを入力できます。このチュートリアルでは、最もシンプルなコマンド put-record を使用して、"testdata" というテキストを含む単一のデータレコードをストリームに入力します。

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

このコマンドが成功すると、出力は次の例のようになります。

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

これで、ストリームにデータを追加できました。次にストリームからデータを取得する方法を説明します。

ステップ 3: レコードを取得する

GetShardIterator

ストリームからデータを取得する前に、関心のあるシャードのシャードイテレーターを取得する必要があります。シャードイテレーターは、コンシューマー (ここでは get-record コマンド) が読み取るストリームとシャードの位置を表します。get-shard-iterator コマンドは次のように使用します。

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

aws kinesis コマンドには Kinesis Data Streams が背API後にあるため、表示されるパラメータのいずれかに関心がある場合は、GetShardIteratorAPIリファレンストピックでそのパラメータについて読むことができます。正常に実行すると、次の例のような出力になります。

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

ランダムに見える長い文字列がシャードイテレーターです (お客様のシャードイテレーターはこれとは異なります)。シャードイテレーターをコピーして、次に示す get コマンドに貼り付ける必要があります。シャードイテレーターの有効期間は 300 秒です。これは、シャードイテレーターをコピーして次のコマンドに貼り付けるのに十分な時間です。次のコマンドに貼り付ける前に、シャードイテレーターから改行を削除する必要があります。シャードイテレーターが無効になったというエラーメッセージが表示された場合は、 get-shard-iterator コマンドを再度実行します。

GetRecords

get-records コマンドはストリームからデータを取得し、Kinesis Data Streams GetRecordsの への呼び出しを解決しますAPI。シャードイテレーターは、データレコードの逐次読み取りを開始する、シャード内の位置を指定します。イテレーターが指定するシャードの位置にレコードがない場合、GetRecords は空のリストを返します。レコードを含むシャードの一部に到達するには、複数の呼び出しが必要になる場合があります。

次のget-recordsコマンドの例では、次のコマンドを使用します。

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

bash などの Unix タイプのコマンドプロセッサからこのチュートリアルを実行している場合は、次のようなネストされたコマンドを使用してシャードイテレーターの取得を自動化できます。

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

をサポートするシステムからこのチュートリアルを実行している場合は PowerShell、次のようなコマンドを使用してシャードイテレーターの取得を自動化できます。

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

get-records コマンドが成功すると、次の例のように、シャードイテレーターを取得したときに指定したシャードのレコードをストリームにリクエストします。

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

get-recordsリクエスト として上記で説明されていることに注意してください。つまり、ストリームにレコードがある場合でも、0 個以上のレコードを受け取る可能性があります。返されるレコードは、現在ストリームにあるすべてのレコードを表すとは限りません。これは正常であり、本番コードは適切な間隔でストリームにレコードをポーリングします。このポーリング速度は、特定のアプリケーション設計要件によって異なります。

チュートリアルのこの部分のレコードでは、データがガベージであるように見え、testdata送信されたクリアテキストではないことがわかります。これは、バイナリデータを送信できるように、put-record では Base64 エンコーディングを使用しているためです。ただし、 の Kinesis Data Streams のサポート AWS CLI では、Base64 デコードは提供されません。これは、stdout に出力された未加工のバイナリコンテンツへの Base64 デコードは、特定のプラットフォームやターミナルで望ましくない動作や潜在的なセキュリティ問題を引き起こす可能性があるためです。Base64 デコーダ (https://www.base64decode.org/ など) を使用して手動で dGVzdGRhdGE= をデコードすると、これが実際に testdata であることを確認できます。実際には、 がデータの使用にほとんど使用されないため、このチュートリアルではこれで十分 AWS CLI です。以前に示したように、ストリームの状態をモニタリングし、情報を取得するためによく使用されます (describe-streamlist-streams)。の詳細についてはKCL、「 を使用した共有スループットのカスタムコンシューマーの開発KCL」を参照してください。

get-records は、指定されたストリーム/シャード内のすべてのレコードを返すとは限りません。このような場合は、最後の結果から NextShardIterator を使用して、次のレコードのセットを取得します。本番稼働用アプリケーションの通常の状況であるストリームにより多くのデータが取り込まれていた場合は、get-records毎回 を使用してデータをポーリングし続けることができます。ただし、300 秒のシャードイテレーターの有効期間内に次のシャードイテレーターget-recordsを使用して を呼び出さない場合、エラーメッセージが表示され、 get-shard-iterator コマンドを使用して新しいシャードイテレーターを取得する必要があります。

また、この出力には も表示されます。これはMillisBehindLatestGetRecordsオペレーションのレスポンスがストリームのティップからのミリ秒数で、コンシューマーが現在どのくらい遅れているかを示します。値ゼロはレコード処理が追いついて、現在処理する新しいレコードは存在しないことを示します。このチュートリアルの場合は、作業を進めるのに時間をかけていると、この数値がかなり大きくなる可能性があります。デフォルトでは、データレコードはストリームに 24 時間保持され、取得されるのを待ちます。この期間は保持期間と呼ばれ、365 日 まで設定可能です。

ストリームに現在レコードがない場合NextShardIteratorでも、正常なget-records結果には常に が含まれます。これは、プロデューサーがどの時点でもストリームにレコードを入力している可能性があることを前提としたポーリングモデルです。独自のポーリングルーチンを作成できますが、前述の をコンシューマーアプリケーションの開発KCLに使用すると、このポーリングが処理されます。

プル元のストリームとシャードにレコードがなくなるget-recordsまで を呼び出すと、次の例のような空のレコードを含む出力が表示されます。

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

ステップ 4: クリーンアップする

ストリームを削除してリソースを解放し、 アカウントへの意図しない課金を回避します。ストリームを作成して使用しない場合は、いつでもこれを行います。これは、ストリームごとに料金が発生し、データを配置して取得するかどうかにかかわらず発生するためです。クリーンアップコマンドは次のとおりです。

aws kinesis delete-stream --stream-name Foo

成功すると、出力は行われません。describe-stream を使用して削除の進行状況を確認します。

aws kinesis describe-stream-summary --stream-name Foo

delete コマンドの直後にこのコマンドを実行すると、次の例のような出力が表示されます。

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

ストリームが完全に削除されると、describe-stream はnot foundエラーを返します。

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.