メニュー
Amazon Kinesis Streams
開発者ガイド

基本的なストリームオペレーションの実行

このセクションでは、AWS CLI を使用した、コマンドラインからの Kinesis stream の基本的な使用方法について説明します。「Amazon Kinesis Streams の主要なコンセプト」と「チュートリアル: Amazon Kinesis Streams を使用したウェブトラフィックの可視化」で説明されている概念を理解している必要があります。

注記

Kinesis Streams は AWS の無料利用枠の対象に該当しないため、ストリームを作成した後、Kinesis Streams の使用に関するわずかな料金が発生します。このチュートリアルを終了したら、AWS リソースを削除して料金が発生しないようにしてください。詳細については、「手順 4: クリーンアップ」を参照してください。

ステップ 1: ストリームを作成する

最初のステップは、ストリームを作成し、正常に作成されたことを確認することです。次のコマンドを使用して、「Foo」という名前のストリームを作成します。

Copy
aws kinesis create-stream --stream-name Foo --shard-count 1

--shard-count パラメータは必須であり、チュートリアルのこの部分では、ストリームで 1 個のシャードを使用しています。次に、次のコマンドを実行して、ストリーム作成の進行状況を確認します。

Copy
aws kinesis describe-stream --stream-name Foo

次の例のような出力が得られます。

{
    "StreamDescription": {
        "StreamStatus": "CREATING",
        "StreamName": "Foo",
        "StreamARN": "arn:aws:kinesis:us-west-2:account-id:stream/Foo",
        "Shards": []
    }
}

この例では、ストリームのステータスは CREATING であり、使用する準備が完全には整っていないことを意味します。しばらくしてからもう一度調べると、次の例のような出力が表示されます。

{
    "StreamDescription": {
        "StreamStatus": "ACTIVE",
        "StreamName": "Foo",
        "StreamARN": "arn:aws:kinesis:us-west-2:account-id:stream/Foo",
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "EndingHashKey": "170141183460469231731687303715884105727",
                    "StartingHashKey": "0"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49546986683135544286507457935754639466300920667981217794"
                }
            }
        ]
    }
}

この出力には、このチュートリアルで気にする必要がない情報も含まれています。ここで重要な項目は "StreamStatus": "ACTIVE" であり、ストリームを使用する準備ができたことと、リクエストした単一のシャードに関する情報が示されています。また、次に示すように、list-streams コマンドを使用して新しいストリームの存在を確認することもできます。

Copy
aws kinesis list-streams

出力:

{
    "StreamNames": [
        "Foo"
    ]
}

ステップ 2: レコードを入力する

アクティブなストリームができたら、データを入力できます。このチュートリアルでは、最もシンプルなコマンド put-record を使用して、"testdata" というテキストを含む単一のデータレコードをストリームに入力します。

Copy
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

このコマンドが成功すると、出力は次の例のようになります。

{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
}

これで、ストリームにデータを追加できました。次にストリームからデータを取得する方法を説明します。

ステップ 3: レコードを取得する

ストリームからデータを取得するには、対象となるシャードのシャードイテレーターを取得する必要があります。シャードイテレーターは、コンシューマー(ここでは get-record コマンド)が読み取るストリームとシャードの位置を表します。次のように get-shard-iterator コマンドを使用します。

Copy
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

aws kinesis のコマンドにはその背後に Kinesis Streams API があります。示されているパラメータに関心がある場合は、GetShardIterator API のリファレンスのトピックを参照してください。実行に成功すると、出力は次の例のようになります(出力全体を表示するには、水平にスクロールします)。

{
    "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg="
}

ランダムに見える長い文字列がシャードイテレーターです(お客様のシャードイテレーターはこれとは異なります)。このシャードイテレーターをコピーして、次に示す get コマンドに貼り付ける必要があります。シャードイテレーターの有効期間は 300 秒です。これは、シャードイテレーターをコピーして次のコマンドに貼り付けるのに十分な時間です。次のコマンドに貼り付ける前に、シャードイテレーターから改行を削除する必要があることに注意してください。シャードイテレーターが有効ではないことを示すエラーメッセージが表示された場合は、もう一度 get-shard-iterator コマンドを実行します。

get-records コマンドは、ストリームからデータを取得し、Kinesis Streams API の GetRecords 呼び出しとして解決されます。シャードイテレーターは、データレコードの逐次読み取りを開始する、シャード内の位置を指定します。イテレーターが指定するシャードの位置にレコードがない場合、GetRecords は空のリストを返します。シャード内のレコードが含まれる位置に到達するために、複数の呼び出しが必要になる場合があることに注意してください。

get-records コマンドの例を次に示します(出力全体を表示するには、水平にスクロールします)。

Copy
aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

bash など Unix タイプのコマンドプロセッサからこのチュートリアルを実行する場合は、次のように入れ子にしたコマンドを使用して、シャードイテレーターの取得を自動化できます(横方向にスクロールしてコマンド全体を表示)。

Copy
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

PowerShell をサポートするシステムからこのチュートリアルを実行する場合、次のようなコマンドを使用してシャードのイテレータの取得を自動化できます (横方向にスクロールしてコマンド全体を表示)。

Copy
aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

get-records コマンドが正常に終了すると、次の例のように、シャードイテレーターを取得するときに指定したシャードに対応するストリーム内のレコードがリクエストされます(出力全体を表示するには、水平にスクロールします)。

{
  "Records":[ {
    "Data":"dGVzdGRhdGE=",
    "PartitionKey":"123”,
    "ApproximateArrivalTimestamp": 1.441215410867E9,
    "SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
  } ],
  "MillisBehindLatest":24000,
  "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}

上記で get-records を "リクエスト" と説明しましたが、これは、ストリーム内にレコードが存在する場合もゼロ件以上のレコードが返される可能性があり、返されたどのレコードも現在ストリーム内にあるすべてのレコードを表していない可能性があることを意味します。これは完全に正常で、本稼働用のコードではストリームに対し、適切な間隔でレコードに対するポーリングを行います(このポーリング速度は、個々のアプリケーションの設計要件によって異なります)。

チュートリアルのこのパートで、レコードについて最初に気付くのは、データが文字化けのように見える点でしょう。送信した testdata のクリアテキストではありません。これは、バイナリデータを送信できるように、put-record では Base64 エンコーディングを使用しているためです。ただし、AWS CLI での Kinesis Streams のサポートでは、Base64 デコーディングを提供していません。これは、Base64 デコーディングされた raw バイナリコンテンツを stdout に出力すると、特定のプラットフォームやターミナルで、意図しない動作やセキュリティ上の問題が発生する可能性があるためです。Base64 デコーダ(https://www.base64decode.org/ など)を使用して手動で dGVzdGRhdGE= をデコードすると、これが実際に testdata であることを確認できます。このチュートリアルではこれで問題ありません。なぜなら、実際には、AWS CLI を使用してデータを利用することはまれであり、通常は前に示したように(describe-stream および list-streams)、ストリームの状態をモニタリングしたり、情報を取得したりするために使用されるからです。後のチュートリアルでは、Kinesis クライアントライブラリ (KCL) を使用して、本稼働品質のコンシューマーアプリケーションを構築する方法を示し、Base64 の処理も検討します。KCL の詳細については、「Kinesis Client Library を使用した Amazon Kinesis Streams コンシューマーの開発」を参照してください。

get-records によって、常にストリーム/シャード内のすべてのレコードが返されるわけではありません。このような場合は、最後の結果から NextShardIterator を使用して、次のレコードのセットを取得します。したがって、大量のデータがストリームに入力されていた場合(本稼働アプリケーションでの通常の状況)、毎回 get-records を使用してデータのポーリングを継続できます。ただし、300 秒のシャードイテレーターの有効期間内に次のシャードイテレーターを使用して get-records を呼び出した場合、エラーメッセージが表示され、get-shard-iterator コマンドを使用して最新のシャードイテレーターを取得する必要があります。

この出力には、MillisBehindLatest も含まれています。これは、GetRecords オペレーションのストリームの末尾からの応答をミリ秒で表した数値であり、コンシューマーの時間の現在の時刻からの遅れを示します。値ゼロはレコード処理が追いついて、現在処理する新しいレコードは存在しないことを示します。このチュートリアルの場合は、作業を進めるのに時間をかけていると、この数値がかなり大きくなる可能性があります。これは問題ではなく、データレコードはストリームに 24 時間留まり、取得されるのを待ちます。この期間は保持期間と呼ばれ、168 時間(7 日)まで設定可能です。

get-records が成功したときの結果は、現在ストリームにこれ以上レコードが見つからない場合でも常に NextShardIterator です。これは、プロデューサーがどの時点でもストリームにレコードを入力している可能性があることを前提としたポーリングモデルです。独自のポーリングルーチンを記述することもできますが、開発中のコンシューマーアプリケーションで、前に説明した KCL を使用している場合、このポーリングによって処理が実行されます。

レコードをプルする対象のストリームまたはシャードにそれ以上レコードがなくなるまで get-records を呼び出すと、次の例のような空のレコードの出力が表示されます(出力全体を表示するには、水平にスクロールします)。

{
    "Records": [],
    "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk="
}

手順 4: クリーンアップ

最後に、ストリームを削除してリソースを解放し、前に説明したように、アカウントに対する意図しない料金が発生することを回避できます。ストリームを作成したが、使用する予定がない場合は、必ず実際にこれを行ってください。ストリームでデータを入力および取得したかどうかにかかわらず、ストリームごとに料金が発生するためです。クリーンアップコマンドはシンプルです。

Copy
aws kinesis delete-stream --stream-name Foo

成功しても出力はないため、describe-stream を使用して削除の進行状況を確認できます。

Copy
aws kinesis describe-stream --stream-name Foo

delete コマンドの直後にこのコマンドを実行する場合、次の例のような出力が表示されます。

{
    "StreamDescription": {
        "StreamStatus": "DELETING",
        "StreamName": "Foo",
        "StreamARN": "arn:aws:kinesis:us-west-2:account-id:stream/Foo",
        "Shards": []
    }
}

ストリームが完全に削除されると、describe-stream は「not found」エラーを返します。

A client error (ResourceNotFoundException) occurred when calling the DescribeStream operation: 
Stream Foo under account 112233445566 not found.