

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# チュートリアル: を使用して基本的な Kinesis Data Streams オペレーションを実行する AWS CLI
<a name="fundamental-stream"></a>

このセクションでは、 AWS CLIを使用した、コマンドラインからの Kinesis data stream の基本的な使用方法について説明します。[Amazon Kinesis Data Streams の用語と概念](key-concepts.md)で説明されている概念を理解している必要があります。

**注記**  
ストリームを作成すると、Kinesis Data Streams は AWS 無料利用枠の対象外であるため、Kinesis Data Streams の使用に対して アカウントにわずかな料金が発生します。このチュートリアルが終了したら、 AWS リソースを削除して料金の発生を停止します。詳細については、「[ステップ 4: クリーンアップする](#clean-up)」を参照してください。

**Topics**
+ [ステップ 1: ストリームを作成する](#create-stream)
+ [ステップ 2: レコードを入力する](#put-record)
+ [ステップ 3: レコードを取得する](#get-records)
+ [ステップ 4: クリーンアップする](#clean-up)

## ステップ 1: ストリームを作成する
<a name="create-stream"></a>

 最初のステップは、ストリームを作成し、正常に作成されたことを確認することです。次のコマンドを使用して、Fooという名前のストリームを作成します。

```
aws kinesis create-stream --stream-name Foo
```

次に、次のコマンドを実行して、ストリーム作成の進行状況を確認します。

```
aws kinesis describe-stream-summary --stream-name Foo
```

次の例のような出力が得られます。

```
{
    "StreamDescriptionSummary": {
        "StreamName": "Foo",
        "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo",
        "StreamStatus": "CREATING",
        "RetentionPeriodHours": 48,
        "StreamCreationTimestamp": 1572297168.0,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "OpenShardCount": 3,
        "ConsumerCount": 0
    }
}
```

 この例では、ストリームのステータスは CREATING であり、使用する準備がまだ整っていないことを意味します。しばらくしてからもう一度調べると、次の例のような出力が表示されます。

```
{
    "StreamDescriptionSummary": {
        "StreamName": "Foo",
        "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 48,
        "StreamCreationTimestamp": 1572297168.0,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "OpenShardCount": 3,
        "ConsumerCount": 0
    }
}
```

この出力には、このチュートリアルで必要がない情報も含まれています。ここで重要な項目は `"StreamStatus": "ACTIVE"` であり、ストリームを使用する準備ができたことと、リクエストした単一のシャードに関する情報が示されています。また、次に示すように、`list-streams` コマンドを使用して新しいストリームの存在を確認することもできます。

```
aws kinesis list-streams
```

出力:

```
{
    "StreamNames": [
        "Foo"
    ]
}
```

## ステップ 2: レコードを入力する
<a name="put-record"></a>

 アクティブなストリームができたら、データを入力できます。このチュートリアルでは、最もシンプルなコマンド `put-record` を使用して、"testdata" というテキストを含む単一のデータレコードをストリームに入力します。

```
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
```

 このコマンドが成功すると、出力は次の例のようになります。

```
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
}
```

これで、ストリームにデータを追加できました。次にストリームからデータを取得する方法を説明します。

## ステップ 3: レコードを取得する
<a name="get-records"></a>

**GetShardIterator**

 ストリームからデータを取得するには、対象となるシャードのシャードイテレーターを取得する必要があります。シャードイテレーターは、コンシューマー (ここでは `get-record` コマンド) が読み取るストリームとシャードの位置を表します。次のように `get-shard-iterator` コマンドを使用します。

```
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
```

`aws kinesis` コマンドの背後には Kinesis Data Streams API があります。示されているパラメータに関心がある場合は、 [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html) API のリファレンスのトピックを参照してください。実行が成功すると、出力は次の例のようになります。

```
{
    "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg="
}
```

ランダムに見える長い文字列がシャードイテレーターです (お客様のシャードイテレーターはこれとは異なります)。このシャードイテレーターをコピーして、次に示す get コマンドに貼り付ける必要があります。シャードイテレーターの有効期間は 300 秒です。これは、シャードイテレーターをコピーして次のコマンドに貼り付けるのに十分な時間です。次のコマンドに貼り付ける前に、シャードイテレーターから改行を削除する必要があります。シャードイテレーターが有効ではないことを示すエラーメッセージが表示された場合は、もう一度 `get-shard-iterator` コマンドを実行します。

**GetRecords**

`get-records` コマンドはストリームからデータを取得し、Kinesis Data Streams API での [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) の呼び出しに解決します。シャードイテレーターは、データレコードの逐次読み取りを開始する、シャード内の位置を指定します。イテレーターが指定するシャードの位置にレコードがない場合、`GetRecords` は空のリストを返します。シャード内のレコードが含まれる位置に到達するために、複数の呼び出しが必要になる場合があります。

次の `get-records` コマンドの例で以下の操作を行います。

```
aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=
```

 bash など Unix タイプのコマンドプロセッサからこのチュートリアルを実行する場合は、次のように入れ子にしたコマンドを使用して、シャードイテレーターの取得を自動化できます。

```
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator')

aws kinesis get-records --shard-iterator $SHARD_ITERATOR
```

PowerShell をサポートするシステムからこのチュートリアルを実行する場合、次のようなコマンドを使用してシャードのイテレータの取得を自動化できます。

```
aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])
```

`get-records` コマンドが正常に終了すると、次の例のように、シャードイテレーターを取得するときに指定したシャードに対応するストリーム内のレコードがリクエストされます。

```
{
  "Records":[ {
    "Data":"dGVzdGRhdGE=",
    "PartitionKey":"123”,
    "ApproximateArrivalTimestamp": 1.441215410867E9,
    "SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
  } ],
  "MillisBehindLatest":24000,
  "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}
```

上記で `get-records` を*リクエスト*として説明しましたが、これは、ストリーム内にレコードが存在する場合でもゼロ件以上のレコードが返される可能性があることを意味します。返されるレコードは、ストリーム内のすべてのレコードを現在表すとは限りません。これは正常であり、本番コードは適切な間隔でストリームをポーリングしてレコードがないか調べます。このポーリング速度は、特定のアプリケーション設計要件によって異なります。

チュートリアルのこのパートのレコードでは、データがゴミのように見えることがわかります。これは私たちが送ったクリアテキスト `testdata` ではありません。これは、バイナリデータを送信できるように、`put-record` では Base64 エンコーディングを使用しているためです。ただし、 の Kinesis Data Streams のサポート AWS CLI では Base64 *デコード*は提供されません。これは、stdout に出力された未加工のバイナリコンテンツへの Base64 デコードが、特定のプラットフォームやターミナルで望ましくない動作や潜在的なセキュリティ問題を引き起こす可能性があるためです。Base64 デコーダ ([https://www.base64decode.org/](https://www.base64decode.org/) など) を使用して手動で `dGVzdGRhdGE=` をデコードすると、これが実際に `testdata` であることを確認できます。これはこのチュートリアルのために十分です。実際には、 AWS CLI がデータの消費に使用されることはほとんどないためです。多くの場合、前述のように (`describe-stream` と `list-streams`)、ストリームの状態をモニタリングし、情報を取得するために使用します。KCL の詳細については、[KCL を使用したスループット共有カスタムコンシューマーの開発](https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html)を参照してください。

`get-records` によって、常に指定されたストリーム/シャード内のすべてのレコードが返されるわけではありません。このような場合は、最後の結果から `NextShardIterator` を使用して、次のレコードのセットを取得します。したがって、大量のデータがストリームに入力されていた場合 これは本稼働アプリケーションでの通常の状況ですが、毎回 `get-records` を使用してデータのポーリングを継続できます。ただし、300 秒のシャードイテレーターの有効期間内に次のシャードイテレーターを使用して `get-records` を呼び出した場合、エラーメッセージが表示され、`get-shard-iterator` コマンドを使用して最新のシャードイテレーターを取得する必要があります。

この出力には、`MillisBehindLatest` も含まれています。これは、ストリームの末尾から [GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) オペレーションのレスポンスまでの時間 (ミリ秒) であり、コンシューマーの時間の現在の時刻からの遅れを示します。値ゼロはレコード処理が追いついて、現在処理する新しいレコードは存在しないことを示します。このチュートリアルの場合は、作業を進めるのに時間をかけていると、この数値がかなり大きくなる可能性があります。デフォルトでは、データレコードはストリームに 24 時間留まり、取得されるのを待ちます。この期間は保持期間と呼ばれ、365 日 まで設定可能です。

`get-records` が成功したときの結果は、現在ストリームにこれ以上レコードが見つからない場合でも常に `NextShardIterator` です。これは、プロデューサーがどの時点でもストリームにレコードを入力している可能性があることを前提としたポーリングモデルです。独自のポーリングルーチンを記述することもできますが、開発中のコンシューマーアプリケーションで、前に説明した KCL を使用している場合、このポーリングによって処理が実行されます。

レコードをプルする対象のストリームまたはシャードにそれ以上レコードがなくなるまで `get-records` を呼び出すと、次の例のような空のレコードの出力が表示されます。

```
{
    "Records": [],
    "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk="
}
```

## ステップ 4: クリーンアップする
<a name="clean-up"></a>

ストリームを削除してリソースを解放し、アカウントに対する意図しない料金が発生することを回避できます。ストリームを作成したが、使用する予定がない場合は、必ずこれを行ってください。ストリームでデータを入力および取得したかどうかにかかわらず、ストリームごとに料金が発生するためです。clean-up コマンドは次のとおりです。

```
aws kinesis delete-stream --stream-name Foo
```

 成功すると、出力は発生しません。削除の進行状況を確認するには、`describe-stream` を使用します。

```
aws kinesis describe-stream-summary --stream-name Foo
```

 delete コマンドの直後にこのコマンドを実行する場合、次の例のような出力が表示されます。

```
{
    "StreamDescriptionSummary": {
        "StreamName": "samplestream",
        "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream",
        "StreamStatus": "ACTIVE",
```

 ストリームが完全に削除されると、`describe-stream` はnot foundエラーを返します。

```
A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: 
Stream Foo under account 123456789012 not found.
```