メニュー
Amazon DynamoDB
開発者ガイド (API Version 2012-08-10)

チュートリアル : DynamoDB ストリーム の低レベルの API

このセクションは、動作中の DynamoDB ストリーム を示す Java プログラムのチュートリアルです。 ソースコードについては、「完成したプログラム: 低レベルの DynamoDB ストリーム API」を参照してください。

このプログラムでは、次のような処理を実行します。

  1. ストリームが有効になった DynamoDB テーブルを作成します。

  2. このテーブルのストリーム設定を記述します。

  3. テーブル内のデータを変更します。

  4. ストリーム内のシャードを記述します。

  5. シャードからストリームレコードを読み込みます。

  6. クリーンアップ.

注記

このコードではすべての例外は処理されず、高トラフィックの条件下では確実に動作しません。DynamoDB からストリームレコードを使用する推奨の方法は、「DynamoDB ストリーム Kinesis Adapter を使用したストリームレコードの処理」で説明しているように、Kinesis Client Library (KCL) を使用し、Kinesis Adapter を介して行うことです。

これらのステップについては次のセクションで説明します。完成したアプリケーションは、チュートリアルの最後に示します。

ステップ 1: ストリームが有効になったテーブルを作成する

最初のステップでは、次のコードスニペットに示すように、DynamoDB にテーブルを作成します。テーブルではストリームが有効になっています。ストリームは、変更された各項目の NEW_AND_OLD_IMAGES をキャプチャします。

Copy
ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition() .withAttributeName("Id") .withAttributeType("N")); ArrayList<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement() .withAttributeName("Id") .withKeyType(KeyType.HASH)); //Partition key StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES); CreateTableRequest createTableRequest = new CreateTableRequest() .withTableName(tableName) .withKeySchema(keySchema) .withAttributeDefinitions(attributeDefinitions) .withProvisionedThroughput(new ProvisionedThroughput() .withReadCapacityUnits(1L) .withWriteCapacityUnits(1L)) .withStreamSpecification(streamSpecification);

ステップ 2: テーブルのストリーム設定を記述する

DescribeTable API を使用すると、テーブルの現在のストリーム設定を表示することができます。 次のコードスニペットは、ストリームが有効になっていることと、適切なデータがキャプチャされることを確認するのに役立ちます。

Copy
DescribeTableResult describeTableResult = dynamoDBClient.describeTable(tableName); String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); StreamSpecification myStreamSpec = describeTableResult.getTable().getStreamSpecification(); System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn); System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());

ステップ 3: テーブル内のデータを変更する

次のステップでは、テーブル内のデータにいくつかの変更を加えます。次のコードスニペットは、テーブルに新しい項目を追加して、その項目の属性を更新し、項目を削除します。

Copy
// Add a new item int numChanges = 0; System.out.println("Making some changes to table data"); Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("Id", new AttributeValue().withN("101")); item.put("Message", new AttributeValue().withS("New item!")); dynamoDBClient.putItem(tableName, item); numChanges++; // Update the item Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN("101")); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); attributeUpdates.put("Message", new AttributeValueUpdate() .withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS("This item has changed"))); dynamoDBClient.updateItem(tableName, key, attributeUpdates); numChanges++; // Delete the item dynamoDBClient.deleteItem(tableName, key); numChanges++;

ステップ 4: ストリーム内のシャードを記述する

テーブル内のデータを変更すると、ストリームレコードがテーブルのストリームに書き込まれます。

ステップ 2: テーブルのストリーム設定を記述する」では、現在のストリーム ARN を調べ、それを変数 myStreamArn に割り当てました。これを DescribeStream アクションで使用すると、ストリーム内のシャードを取得できます。

DynamoDB テーブル内のデータを大きく変更したわけではないため、リストにはシャードが 1 つしかありません。次のコードスニペットは、この情報を取得する方法を示しています。

Copy
DescribeStreamResult describeStreamResult = streamsClient.describeStream(new DescribeStreamRequest() .withStreamArn(myStreamArn)); String streamArn = describeStreamResult.getStreamDescription().getStreamArn(); List<Shard> shards = describeStreamResult.getStreamDescription().getShards();

ステップ 5: ストリーム レコードを読み込む

リスト内のシャードごとに、シャードイテレーターを取得し、そのイテレーターを使用することでストリームレコードを取得して印刷します。

次のコードスニペットは、シャード 1 つしかない場合でもループを使用してシャードリストを処理します。

Copy
for (Shard shard : shards) { String shardId = shard.getShardId(); System.out.println( "Processing " + shardId + " from stream "+ streamArn); // Get an iterator for the current shard GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() .withStreamArn(myStreamArn) .withShardId(shardId) .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); GetShardIteratorResult getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest); String nextItr = getShardIteratorResult.getShardIterator(); while (nextItr != null && numChanges > 0) { // Use the iterator to read the data records from the shard GetRecordsResult getRecordsResult = streamsClient.getRecords(new GetRecordsRequest(). withShardIterator(nextItr)); List<Record> records = getRecordsResult.getRecords(); System.out.println("Getting records..."); for (Record record : records) { System.out.println(record); numChanges--; } nextItr = getRecordsResult.getNextShardIterator(); } }

ステップ 6: クリーンアップ

デモが完了したため、テーブルを削除できます。テーブルが削除されても、このテーブルに関連付けられたストリームは引き続き読み込み可能である点に注意してください。ストリームは 24 時間後に自動的に削除されます。

Copy
dynamoDBClient.deleteTable(tableName);