Amazon DynamoDB
Developer Guide (API Version 2012-08-10)

Walkthrough: DynamoDB Streams Low-Level API

This section is a walkthrough of a Java program that shows DynamoDB Streams in action. For the source code, see Complete Program: Low-Level DynamoDB Streams API.

The program does the following:

  1. Creates a DynamoDB table with a stream enabled.

  2. Describes the stream settings for this table.

  3. Modify data in the table.

  4. Describe the shards in the stream.

  5. Read the stream records from the shards.

  6. Clean up.


This code does not handle all exceptions, and will not work reliably under high-traffic conditions. The recommended way to consume Stream records from DynamoDB is through the Kinesis Adapter using the Amazon Kinesis Client Library (KCL), as described in Using the DynamoDB Streams Kinesis Adapter to Process Stream Records.

These steps are described in the following sections, and the complete application is shown at the end of the walkthrough.

Step 1: Create a Table with a Stream Enabled

The first step is to create a table in DynamoDB, as in the following code snippet. The table has a stream enabled, which will capture the NEW_AND_OLD_IMAGES of each item that is modified.

ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition() .withAttributeName("Id") .withAttributeType("N")); ArrayList<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement() .withAttributeName("Id") .withKeyType(KeyType.HASH)); //Partition key StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES); CreateTableRequest createTableRequest = new CreateTableRequest() .withTableName(tableName) .withKeySchema(keySchema) .withAttributeDefinitions(attributeDefinitions) .withProvisionedThroughput(new ProvisionedThroughput() .withReadCapacityUnits(1L) .withWriteCapacityUnits(1L)) .withStreamSpecification(streamSpecification);

Step 2: Describe the Stream Settings For The Table

The DescribeTable API lets you view the current stream settings for a table. The following code snippet helps confirm that the stream is enabled, and that it will capture the correct data.

DescribeTableResult describeTableResult = dynamoDBClient.describeTable(tableName); String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); StreamSpecification myStreamSpec = describeTableResult.getTable().getStreamSpecification(); System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn); System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());

Step 3: Modify data in the table

The next step is to make some changes to the data in the table. The following code snippet adds a new item to the table, updates an attribute in that item, and then deletes the item.

// Add a new item int numChanges = 0; System.out.println("Making some changes to table data"); Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("Id", new AttributeValue().withN("101")); item.put("Message", new AttributeValue().withS("New item!")); dynamoDBClient.putItem(tableName, item); numChanges++; // Update the item Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN("101")); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); attributeUpdates.put("Message", new AttributeValueUpdate() .withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS("This item has changed"))); dynamoDBClient.updateItem(tableName, key, attributeUpdates); numChanges++; // Delete the item dynamoDBClient.deleteItem(tableName, key); numChanges++;

Step 4: Describe the Shards in the Stream

The data modifications in the table will result in stream records being written to the table's stream.

In Step 2: Describe the Stream Settings For The Table, we determined the current stream ARN and assigned it to the variable myStreamArn. We can use this with the DescribeStream action to obtain the shards in the stream.

Because we did not modify very much data in the DynamoDB table, there will only be one shard in the list. The following code snippet shows how to obtain this information.

DescribeStreamResult describeStreamResult = streamsClient.describeStream(new DescribeStreamRequest() .withStreamArn(myStreamArn)); String streamArn = describeStreamResult.getStreamDescription().getStreamArn(); List<Shard> shards = describeStreamResult.getStreamDescription().getShards();

Step 5: Read the Stream Records

For each shard in the list, we obtain a shard iterator, and then use the iterator to obtain the stream records and print them.

The following code snippet uses a loop to process the shard list, even though there is only one shard.

for (Shard shard : shards) { String shardId = shard.getShardId(); System.out.println( "Processing " + shardId + " from stream "+ streamArn); // Get an iterator for the current shard GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() .withStreamArn(myStreamArn) .withShardId(shardId) .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); GetShardIteratorResult getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest); String nextItr = getShardIteratorResult.getShardIterator(); while (nextItr != null && numChanges > 0) { // Use the iterator to read the data records from the shard GetRecordsResult getRecordsResult = streamsClient.getRecords(new GetRecordsRequest(). withShardIterator(nextItr)); List<Record> records = getRecordsResult.getRecords(); System.out.println("Getting records..."); for (Record record : records) { System.out.println(record); numChanges--; } nextItr = getRecordsResult.getNextShardIterator(); } }

Step 6: Clean Up

The demo is complete, so we can delete the table. Note that the stream associated with this table will remain available for reading, even though the table is deleted. The stream will be automatically deleted after 24 hours.