Amazon DynamoDB 用 Kinesis Data Streams の開始方法
このセクションでは、Amazon DynamoDB コンソール、AWS Command Line Interface (AWS CLI)、および API を活用して Amazon DynamoDB 用の Amazon Kinesis Data Streams テーブルを使用する方法について説明します。
これらの例はすべて、DynamoDB の使用開始チュートリアルの一部として作成された Music
DynamoDB テーブルを使用しています。
コンシューマーを構築し、Kinesis データストリームを他の AWS のサービスに接続する詳細方法については、「Amazon Kinesis Data Streams デベロッパーガイド」の「Kinesis Data Streams からのデータの読み込み」を参照してください。
KDS シャードを初めて使用するときは、使用パターンに合わせてシャードをスケールアップまたはスケールダウンするように設定することをお勧めします。使用パターンに関するデータをさらに蓄積したら、それに合わせてストリーム内のシャードを調整できます。
- Console
-
-
AWS Management Console にサインインし、Kinesis コンソール (https://console.aws.amazon.com/kinesis/) を開きます。
-
[Create data stream (データストリーミングの作成)] を選択し、指示に従って samplestream
というストリーミングを作成します。
-
DynamoDB コンソール (https://console.aws.amazon.com/dynamodb/) を開きます。
-
コンソールの左側のナビゲーションペインで、[テーブル] を選択します。
-
[Music] テーブルを選択します。
-
[エクスポートとストリーム] タブを選択します。
-
(オプション) [Amazon Kinesis データストリームの詳細] で、レコードのタイムスタンプの精度をマイクロ秒 (デフォルト) からミリ秒に変更できます。
-
ドロップダウンリストから samplestream を選択します。
-
[オンにする] ボタンを選択します。
- AWS CLI
-
-
create-stream コマンドを使用して、samplestream
という名前の Kinesis データストリームを作成します。
aws kinesis create-stream --stream-name samplestream --shard-count 3
Kinesis データストリームのシャード数を設定する前に「Kinesis Data Streams のシャード管理に関する考慮事項」を参照してください。
-
Kinesis ストリームがアクティブで、使用できる状態になっていることを確認するには、describe-stream コマンドを使用します。
aws kinesis describe-stream --stream-name samplestream
-
DynamoDB enable-kinesis-streaming-destination
コマンドを使用して、DynamoDB テーブルで Kinesis ストリーミングを有効にします。stream-arn
の値を、前のステップの describe-stream
によって返された値で置き換えます。オプションで、各レコードに返されるタイムスタンプ値の精度をより細かく (マイクロ秒) したストリーミングを有効にします。
マイクロ秒のタイムスタンプ精度でのストリーミングを有効にします。
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
--enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
または、デフォルトのタイムスタンプ精度 (ミリ秒) でのストリーミングを有効にします。
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
-
DynamoDB describe-kinesis-streaming-destination
コマンドを使用して、テーブルで Kinesis ストリーミングがアクティブかどうかを確認します。
aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
「DynamoDB デベロッパーガイド」の説明通りに、put-item
コマンドを使用して DynamoDB テーブルにデータを書き込みます。
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
Kinesis get-records CLI コマンドを使用して、Kinesis ストリームコンテンツを取得します。次に、以下のコードスニペットを使用して、ストリーミングコンテンツを逆シリアル化します。
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we want to fetch the value
* of this attribute from the new item image. The following code fetches this value.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
- Java
-
-
「Kinesis Data Streams デベロッパーガイド」の指示に従って、Java を使用し samplestream
という名前の Kinesis データストリームを作成します。
Kinesis データストリームのシャード数を設定する前に「Kinesis Data Streams のシャード管理に関する考慮事項」を参照してください。
-
次のコードスニペットを使用して、DynamoDB テーブルで Kinesis ストリーミングを有効にします。オプションで、各レコードに返されるタイムスタンプ値の精度をより細かく (マイクロ秒) したストリーミングを有効にします。
マイクロ秒のタイムスタンプ精度でのストリーミングを有効にします。
EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
.approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
.build();
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.enableKinesisStreamingConfiguration(enableKdsConfig)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
または、デフォルトのタイムスタンプ精度 (ミリ秒) でのストリーミングを有効にします。
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
「Kinesis Data Streams デベロッパーガイド」の指示に従って、作成したデータストリームから読み込みます。
-
次のコードスニペットを使用して、ストリームコンテンツを逆シリアル化します。
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
* of this attribute from the new item image, the below code would fetch this.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
アクティブな Amazon Kinesis データストリームに変更を加える
このセクションでは、コンソール、AWS CLI、API を使用して DynamoDB 用 Kinesis Data Streams のセットアップを変更する方法について説明します。
AWS Management Console
AWS CLI
describe-kinesis-streaming-destination
を呼び出して、ストリームが ACTIVE
であることを確認します。
次の例のように、UpdateKinesisStreamingDestination
を呼び出します。
aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
describe-kinesis-streaming-destination
を呼び出して、ストリームが UPDATING
であることを確認します。
ストリーミングステータスが再び ACTIVE
になるまで describe-kinesis-streaming-destination
を定期的に呼び出します。タイムスタンプの精度の更新が有効になるまで、最大 5 分かかることがあります。このステータスが更新されると、更新が完了したことを表し、新しい精度値が今後のレコードに適用されます。
putItem
を使用してテーブルに書き込みます。
Kinesis get-records
コマンドを使用して、Kinesis ストリームコンテンツを取得します。
書き込みの ApproximateCreationDateTime
の精度が希望どおりであることを確認します。
Java API
UpdateKinesisStreamingDestination
リクエストと UpdateKinesisStreamingDestination
レスポンスを構成するコードスニペットを提供します。
DescribeKinesisStreamingDestination
リクエストと DescribeKinesisStreamingDestination response
を構成するコードスニペットを提供します。
ストリーミングのステータスが、更新が完了し、将来のレコードに新しい精度値が適用されることを示す ACTIVE
に戻るまで、describe-kinesis-streaming-destination
を定期的に呼び出します。
テーブルへの書き込みを実行します。
ストリームから読み取り、ストリームコンテンツを逆シリアル化します。
書き込みの ApproximateCreationDateTime
の精度が希望どおりであることを確認します。