AWS CLI を使用した基本的な Kinesis Data Stream オペレーションの実行 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS CLI を使用した基本的な Kinesis Data Stream オペレーションの実行

このセクションでは、AWS CLI を使用した、コマンドラインからの Kinesis data stream の基本的な使用方法について説明します。Amazon Kinesis Data Streams の用語と概念で説明されている概念を理解している必要があります。

注記

Kinesis Data Streams は AWS の無料利用枠の対象外であるため、ストリームの作成後は、Kinesis Data Streams の使用に対してアカウントに少額の料金が発生します。このチュートリアルを終了したら、AWS リソースを削除して料金が発生しないようにしてください。詳細については、ステップ 4: クリーンアップするを参照してください。

ステップ 1: ストリームを作成する

最初のステップは、ストリームを作成し、正常に作成されたことを確認することです。次のコマンドを使用して、Fooという名前のストリームを作成します。

aws kinesis create-stream --stream-name Foo

次に、次のコマンドを実行して、ストリーム作成の進行状況を確認します。

aws kinesis describe-stream-summary --stream-name Foo

次の例のような出力が得られます。

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

この例では、ストリームのステータスは CREATING であり、使用する準備が完全には整っていないことを意味します。しばらくしてからもう一度調べると、次の例のような出力が表示されます。

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

この出力には、このチュートリアルで気にする必要がない情報も含まれています。ここで重要な項目は "StreamStatus": "ACTIVE" であり、ストリームを使用する準備ができたことと、リクエストした単一のシャードに関する情報が示されています。また、次に示すように、list-streams コマンドを使用して新しいストリームの存在を確認することもできます。

aws kinesis list-streams

出力:

{ "StreamNames": [ "Foo" ] }

ステップ 2: レコードを入力する

アクティブなストリームができたら、データを入力できます。このチュートリアルでは、最もシンプルなコマンド put-record を使用して、"testdata" というテキストを含む単一のデータレコードをストリームに入力します。

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

このコマンドが成功すると、出力は次の例のようになります。

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

これで、ストリームにデータを追加できました。次にストリームからデータを取得する方法を説明します。

ステップ 3: レコードを取得する

GetShardIterator

ストリームからデータを取得するには、対象となるシャードのシャードイテレーターを取得する必要があります。シャードイテレーターは、コンシューマー (ここでは get-record コマンド) が読み取るストリームとシャードの位置を表します。次のように get-shard-iterator コマンドを使用します。

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

aws kinesis コマンドの背後には Kinesis Data Streams API があります。示されているパラメータに関心がある場合は、 GetShardIterator API のリファレンスのトピックを参照してください。実行に成功すると、出力は次の例のようになります (出力全体を表示するには、水平にスクロールします)。

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

ランダムに見える長い文字列がシャードイテレーターです (お客様のシャードイテレーターはこれとは異なります)。このシャードイテレーターをコピーして、次に示す get コマンドに貼り付ける必要があります。シャードイテレーターの有効期間は 300 秒です。これは、シャードイテレーターをコピーして次のコマンドに貼り付けるのに十分な時間です。次のコマンドに貼り付ける前に、シャードイテレーターから改行を削除する必要があることに注意してください。シャードイテレーターが有効ではないことを示すエラーメッセージが表示された場合は、もう一度 get-shard-iterator コマンドを実行します。

GetRecords

get-records コマンドはストリームからデータを取得し、Kinesis Data Streams API での GetRecords の呼び出しに解決します。シャードイテレーターは、データレコードの逐次読み取りを開始する、シャード内の位置を指定します。イテレーターが指定するシャードの位置にレコードがない場合、GetRecords は空のリストを返します。シャード内のレコードが含まれる位置に到達するために、複数の呼び出しが必要になる場合があることに注意してください。

get-records コマンドの例を次に示します (出力全体を表示するには、水平にスクロールします)。

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

bash など Unix タイプのコマンドプロセッサからこのチュートリアルを実行する場合は、次のように入れ子にしたコマンドを使用して、シャードイテレーターの取得を自動化できます (横方向にスクロールしてコマンド全体を表示)。

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

PowerShell をサポートするシステムからこのチュートリアルを実行する場合、次のようなコマンドを使用してシャードのイテレータの取得を自動化できます (横方向にスクロールしてコマンド全体を表示)。

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

get-records コマンドが正常に終了すると、次の例のように、シャードイテレーターを取得するときに指定したシャードに対応するストリーム内のレコードがリクエストされます (出力全体を表示するには、水平にスクロールします)。

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

上記で get-recordsリクエストとして説明しましたが、これは、ストリーム内にレコードが存在する場合でもゼロ件以上のレコードが返される可能性があり、返されたレコードはストリーム内に現存するすべてのレコードを示していない可能性があることを意味します。これは完全に正常で、本稼働用のコードではストリームに対し、適切な間隔でレコードに対するポーリングを行います (このポーリング速度は、個々のアプリケーションの設計要件によって異なります)。

チュートリアルのこのパートでレコードについて最初に気づくのは、データがゴミのように見えることです。これは私たちが送ったクリアテキスト testdata ではありません。これは、バイナリデータを送信できるように、put-record では Base64 エンコーディングを使用しているためです。ただし、AWS CLI での Kinesis Data Streams のサポートでは、Base64 デコーディングを提供していません。これは、Base64 デコーディングされた raw バイナリコンテンツを stdout に出力すると、特定のプラットフォームやターミナルで、意図しない動作やセキュリティ上の問題が発生する可能性があるためです。Base64 デコーダ (https://www.base64decode.org/ など) を使用して手動で dGVzdGRhdGE= をデコードすると、これが実際に testdata であることを確認できます。このチュートリアルではこれで問題ありません。なぜなら、実際には、AWS CLI を使用してデータを利用することはまれであり、通常は前に示したようにストリームの状態をモニタリングしたり、情報を取得したりするために使用されるからです (describe-stream および list-streams)。後のチュートリアルでは、Kinesis Client Library (KCL) を使用して、本稼働品質のコンシューマーアプリケーションを構築する方法を示し、Base64 の処理も検討します。KCL の詳細については、KCL を使用したスループット共有カスタムコンシューマーの開発を参照してください。

get-records によって、常にストリーム/シャード内のすべてのレコードが返されるわけではありません。このような場合は、最後の結果から NextShardIterator を使用して、次のレコードのセットを取得します。したがって、大量のデータがストリームに入力されていた場合 (本稼働アプリケーションでの通常の状況)、毎回 get-records を使用してデータのポーリングを継続できます。ただし、300 秒のシャードイテレーターの有効期間内に次のシャードイテレーターを使用して get-records を呼び出した場合、エラーメッセージが表示され、get-shard-iterator コマンドを使用して最新のシャードイテレーターを取得する必要があります。

この出力には、MillisBehindLatest も含まれています。これは、ストリームの末尾から GetRecords オペレーションのレスポンスまでの時間 (ミリ秒) であり、コンシューマーの時間の現在の時刻からの遅れを示します。値ゼロはレコード処理が追いついて、現在処理する新しいレコードは存在しないことを示します。このチュートリアルの場合は、作業を進めるのに時間をかけていると、この数値がかなり大きくなる可能性があります。これは問題ではなく、デフォルトでは、データレコードはストリームに 24 時間留まり、取得されるのを待ちます。この期間は保持期間と呼ばれ、365 日 まで設定可能です。

get-records が成功したときの結果は、現在ストリームにこれ以上レコードが見つからない場合でも常に NextShardIterator です。これは、プロデューサーがどの時点でもストリームにレコードを入力している可能性があることを前提としたポーリングモデルです。独自のポーリングルーチンを記述することもできますが、開発中のコンシューマーアプリケーションで、前に説明した KCL を使用している場合、このポーリングによって処理が実行されます。

レコードをプルする対象のストリームまたはシャードにそれ以上レコードがなくなるまで get-records を呼び出すと、次の例のような空のレコードの出力が表示されます (出力全体を表示するには、水平にスクロールします)。

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

ステップ 4: クリーンアップする

最後に、ストリームを削除してリソースを解放し、前に説明したように、アカウントに対する意図しない料金が発生することを回避できます。ストリームを作成したが、使用する予定がない場合は、必ず実際にこれを行ってください。ストリームでデータを入力および取得したかどうかにかかわらず、ストリームごとに料金が発生するためです。クリーンアップコマンドはシンプルです。

aws kinesis delete-stream --stream-name Foo

成功しても出力はないため、describe-stream を使用して削除の進行状況を確認できます。

aws kinesis describe-stream-summary --stream-name Foo

delete コマンドの直後にこのコマンドを実行する場合、次の例のような出力部分が表示される可能性があります。

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

ストリームが完全に削除されると、describe-stream はnot foundエラーを返します。

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.