Amazon DynamoDB 用 Kinesis Data Streams の開始方法 - Amazon DynamoDB

Amazon DynamoDB 用 Kinesis Data Streams の開始方法

このセクションでは、Amazon DynamoDB コンソール、AWS Command Line Interface (AWS CLI)、および API を活用して Amazon DynamoDB 用の Amazon Kinesis Data Streams テーブルを使用する方法について説明します。

これらの例はすべて、DynamoDB の使用開始チュートリアルの一部として作成された Music DynamoDB テーブルを使用しています。

コンシューマーを構築し、Kinesis データストリームを他の AWS のサービスに接続する詳細方法については、「Amazon Kinesis Data Streams デベロッパーガイド」の「Kinesis Data Streams からのデータの読み込み」を参照してください。

注記

KDS シャードを初めて使用するときは、使用パターンに合わせてシャードをスケールアップまたはスケールダウンするように設定することをお勧めします。使用パターンに関するデータをさらに蓄積したら、それに合わせてストリーム内のシャードを調整できます。

Console
  1. AWS Management Console にサインインし、Kinesis コンソール (https://console.aws.amazon.com/kinesis/) を開きます。

  2. [Create data stream (データストリーミングの作成)] を選択し、指示に従って samplestream というストリーミングを作成します。

  3. DynamoDB コンソール (https://console.aws.amazon.com/dynamodb/) を開きます。

  4. コンソールの左側のナビゲーションペインで、[テーブル] を選択します。

  5. [Music] テーブルを選択します。

  6. [エクスポートとストリーム] タブを選択します。

  7. (オプション) [Amazon Kinesis データストリームの詳細] で、レコードのタイムスタンプの精度をマイクロ秒 (デフォルト) からミリ秒に変更できます。

  8. ドロップダウンリストから samplestream を選択します。

  9. [オンにする] ボタンを選択します。

AWS CLI
  1. create-stream コマンドを使用して、samplestream という名前の Kinesis データストリームを作成します。

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    Kinesis データストリームのシャード数を設定する前に「Kinesis Data Streams のシャード管理に関する考慮事項」を参照してください。

  2. Kinesis ストリームがアクティブで、使用できる状態になっていることを確認するには、describe-stream コマンドを使用します。

    aws kinesis describe-stream --stream-name samplestream
  3. DynamoDB enable-kinesis-streaming-destination コマンドを使用して、DynamoDB テーブルで Kinesis ストリーミングを有効にします。stream-arn の値を、前のステップの describe-stream によって返された値で置き換えます。オプションで、各レコードに返されるタイムスタンプ値の精度をより細かく (マイクロ秒) したストリーミングを有効にします。

    マイクロ秒のタイムスタンプ精度でのストリーミングを有効にします。

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND

    または、デフォルトのタイムスタンプ精度 (ミリ秒) でのストリーミングを有効にします。

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
  4. DynamoDB describe-kinesis-streaming-destination コマンドを使用して、テーブルで Kinesis ストリーミングがアクティブかどうかを確認します。

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. DynamoDB デベロッパーガイド」の説明通りに、put-item コマンドを使用して DynamoDB テーブルにデータを書き込みます。

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. Kinesis get-records CLI コマンドを使用して、Kinesis ストリームコンテンツを取得します。次に、以下のコードスニペットを使用して、ストリーミングコンテンツを逆シリアル化します。

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. 「Kinesis Data Streams デベロッパーガイド」の指示に従って、Java を使用し samplestream という名前の Kinesis データストリームを作成します。

    Kinesis データストリームのシャード数を設定する前に「Kinesis Data Streams のシャード管理に関する考慮事項」を参照してください。

  2. 次のコードスニペットを使用して、DynamoDB テーブルで Kinesis ストリーミングを有効にします。オプションで、各レコードに返されるタイムスタンプ値の精度をより細かく (マイクロ秒) したストリーミングを有効にします。

    マイクロ秒のタイムスタンプ精度でのストリーミングを有効にします。

    EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder() .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND) .build(); EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .enableKinesisStreamingConfiguration(enableKdsConfig) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);

    または、デフォルトのタイムスタンプ精度 (ミリ秒) でのストリーミングを有効にします。

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. 「Kinesis Data Streams デベロッパーガイド」の指示に従って、作成したデータストリームから読み込みます。

  4. 次のコードスニペットを使用して、ストリームコンテンツを逆シリアル化します。

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }

アクティブな Amazon Kinesis データストリームに変更を加える

このセクションでは、コンソール、AWS CLI、API を使用して DynamoDB 用 Kinesis Data Streams のセットアップを変更する方法について説明します。

AWS Management Console

  1. DynamoDB コンソール (https://console.aws.amazon.com/dynamodb/) を開きます。

  2. テーブルに移動します。

  3. [エクスポートおよびストリーム] タブを選択します。

AWS CLI

  1. describe-kinesis-streaming-destination を呼び出して、ストリームが ACTIVE であることを確認します。

  2. 次の例のように、UpdateKinesisStreamingDestination を呼び出します。

    aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
  3. describe-kinesis-streaming-destination を呼び出して、ストリームが UPDATING であることを確認します。

  4. ストリーミングステータスが再び ACTIVE になるまで describe-kinesis-streaming-destination を定期的に呼び出します。タイムスタンプの精度の更新が有効になるまで、最大 5 分かかることがあります。このステータスが更新されると、更新が完了したことを表し、新しい精度値が今後のレコードに適用されます。

  5. putItem を使用してテーブルに書き込みます。

  6. Kinesis get-records コマンドを使用して、Kinesis ストリームコンテンツを取得します。

  7. 書き込みの ApproximateCreationDateTime の精度が希望どおりであることを確認します。

Java API

  1. UpdateKinesisStreamingDestination リクエストと UpdateKinesisStreamingDestination レスポンスを構成するコードスニペットを提供します。

  2. DescribeKinesisStreamingDestination リクエストと DescribeKinesisStreamingDestination response を構成するコードスニペットを提供します。

  3. ストリーミングのステータスが、更新が完了し、将来のレコードに新しい精度値が適用されることを示す ACTIVE に戻るまで、describe-kinesis-streaming-destination を定期的に呼び出します。

  4. テーブルへの書き込みを実行します。

  5. ストリームから読み取り、ストリームコンテンツを逆シリアル化します。

  6. 書き込みの ApproximateCreationDateTime の精度が希望どおりであることを確認します。