Amazon DynamoDB 用 Kinesis Data Streams の開始方法
このセクションでは、Amazon DynamoDB コンソール、AWS Command Line Interface (AWS CLI)、および API を活用して Amazon DynamoDB 用の Amazon Kinesis Data Streams テーブルを使用する方法について説明します。
これらの例はすべて、DynamoDB の使用開始チュートリアルの一部として作成された Music
DynamoDB テーブルを使用しています。
コンシューマーを構築し、Kinesis データストリームを他の AWS のサービスに接続する詳細方法については、「Amazon Kinesis Data Streams デベロッパーガイド」の「Kinesis Data Streams からのデータの読み込み」を参照してください。
注記
KDS シャードを初めて使用するときは、使用パターンに合わせてシャードをスケールアップまたはスケールダウンするように設定することをお勧めします。使用パターンに関するデータをさらに蓄積したら、それに合わせてストリーム内のシャードを調整できます。
- Console
-
-
AWS Management Console にサインインし、Kinesis コンソール (https://console.aws.amazon.com/kinesis/) を開きます。
-
[Create data stream (データストリーミングの作成)] を選択し、指示に従って
samplestream
というストリーミングを作成します。 -
https://console.aws.amazon.com/dynamodb/ で DynamoDB コンソールを開きます。
-
コンソールの左側のナビゲーションペインで、[テーブル] を選択します。
-
[Music] テーブルを選択します。
-
[エクスポートとストリーム] タブを選択します。
-
[Kinesis Data Streams 詳細] で、[有効化] ボタンを選択します。
-
ドロップダウンリストから samplestream を選択します。
-
[ストリーミングを有効化] ボタンを選択します。
-
- AWS CLI
-
-
create-stream コマンドを使用して、
samplestream
という名前の Kinesis データストリームを作成します。aws kinesis create-stream --stream-name samplestream --shard-count 3
Kinesis データストリームのシャード数を設定する前に「Kinesis Data Streams のシャード管理に関する考慮事項」を参照してください。
-
Kinesis ストリームがアクティブで、使用できる状態になっていることを確認するには、describe-stream コマンドを使用します。
aws kinesis describe-stream --stream-name samplestream
-
DynamoDB
enable-kinesis-streaming-destination
コマンドを使用して、DynamoDB テーブルで Kinesis ストリーミングを有効にします。stream-arn
の値を、前のステップのdescribe-stream
によって返された値で置き換えます。aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:123456789012:stream/samplestream
-
DynamoDB
describe-kinesis-streaming-destination
コマンドを使用して、テーブルで Kinesis ストリーミングがアクティブかどうかを確認します。aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
「DynamoDB デベロッパーガイド」の説明通りに、
put-item
コマンドを使用して DynamoDB テーブルにデータを書き込みます。aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
Kinesis get-records CLI コマンドを使用して、Kinesis ストリームコンテンツを取得します。次に、以下のコードスニペットを使用して、ストリーミングコンテンツを逆シリアル化します。
/** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }
-
- Java
-
-
「Kinesis Data Streams デベロッパーガイド」の指示に従って、Java を使用し
samplestream
という名前の Kinesis データストリームを作成します。Kinesis データストリームのシャード数を設定する前に「Kinesis Data Streams のシャード管理に関する考慮事項」を参照してください。
-
次のコードスニペットを使用して、DynamoDB テーブルで Kinesis ストリーミングを有効にします。
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
「Kinesis Data Streams デベロッパーガイド」の指示に従って、作成したデータストリームから読み込みます。
-
次のコードスニペットを使用して、ストリームコンテンツを逆シリアル化します。
/** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); return Instant.ofEpochMilli(timestampJson.longValue()); }
-