Menu
Amazon DynamoDB
Developer Guide (API Version 2012-08-10)

Complete Program: Low-Level DynamoDB Streams API

Here is a complete Java program that performs the tasks described in this walkthrough. When you run it, you will see each stream record in its entirety:

Copy
Issuing CreateTable request for TestTableForStreams Waiting for TestTableForStreams to be created... Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-west-2:111122223333:table/TestTableForStreams/stream/2015-05-19T23:03:50.641 Stream enabled: true Update view type: NEW_AND_OLD_IMAGES Making some changes to table data Processing shardId-00000001415575208348-98d954b6 from stream arn:aws:dynamodb:us-west-2:111122223333:table/TestTableForStreams/stream/2015-05-19T23:03:50.641 Getting records... {eventID: 7f6ba6f037b9fdd5a43af22cb726f0cd,eventName: INSERT,eventVersion: 1.0,eventSource: aws:dynamodb,awsRegion: us-west-2,dynamodb: {Keys: {Id={N: 101,}},NewImage: {Message={S: New item!,}, Id={N: 101,}},SequenceNumber: 100000000000000507337,SizeBytes: 26,StreamViewType: NEW_AND_OLD_IMAGES}} {eventID: 8f546e78ab6183d1441c0680ec03dcfc,eventName: MODIFY,eventVersion: 1.0,eventSource: aws:dynamodb,awsRegion: us-west-2,dynamodb: {Keys: {Id={N: 101,}},NewImage: {Message={S: This item has changed,}, Id={N: 101,}},OldImage: {Message={S: New item!,}, Id={N: 101,}},SequenceNumber: 200000000000000507338,SizeBytes: 59,StreamViewType: NEW_AND_OLD_IMAGES}} {eventID: d9bb1e7a1684dfd66c8a3fb8ca2f6977,eventName: REMOVE,eventVersion: 1.0,eventSource: aws:dynamodb,awsRegion: us-west-2,dynamodb: {Keys: {Id={N: 101,}},OldImage: {Message={S: This item has changed,}, Id={N: 101,}},SequenceNumber: 300000000000000507339,SizeBytes: 38,StreamViewType: NEW_AND_OLD_IMAGES}} Deleting the table... Demo complete

Example

Copy
// Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package com.amazonaws.codesamples.gsg; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient; import com.amazonaws.services.dynamodbv2.model.AttributeAction; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest; import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest; import com.amazonaws.services.dynamodbv2.model.GetRecordsResult; import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest; import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.Record; import com.amazonaws.services.dynamodbv2.model.Shard; import com.amazonaws.services.dynamodbv2.model.ShardIteratorType; import com.amazonaws.services.dynamodbv2.model.StreamSpecification; import com.amazonaws.services.dynamodbv2.model.StreamViewType; import com.amazonaws.services.dynamodbv2.util.Tables; public class StreamsLowLevelDemo { private static AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(new ProfileCredentialsProvider()); private static AmazonDynamoDBStreamsClient streamsClient = new AmazonDynamoDBStreamsClient( new ProfileCredentialsProvider()); public static void main(String args[]) { dynamoDBClient.setEndpoint("DYNAMODB_ENDPOINT_GOES_HERE"); streamsClient.setEndpoint("STREAMS_ENDPOINT_GOES_HERE"); // Create the table String tableName = "TestTableForStreams"; ArrayList<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); ArrayList<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withKeySchema(keySchema).withAttributeDefinitions(attributeDefinitions) .withProvisionedThroughput(new ProvisionedThroughput().withReadCapacityUnits(1L).withWriteCapacityUnits(1L)) .withStreamSpecification(streamSpecification); System.out.println("Issuing CreateTable request for " + tableName); dynamoDBClient.createTable(createTableRequest); System.out.println("Waiting for " + tableName + " to be created..."); try { Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); } catch (InterruptedException e) { e.printStackTrace(); } // Determine the Streams settings for the table DescribeTableResult describeTableResult = dynamoDBClient.describeTable(tableName); String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); StreamSpecification myStreamSpec = describeTableResult.getTable().getStreamSpecification(); System.out.println("Current stream ARN for " + tableName + ": " + myStreamArn); System.out.println("Stream enabled: " + myStreamSpec.getStreamEnabled()); System.out.println("Update view type: " + myStreamSpec.getStreamViewType()); // Add a new item int numChanges = 0; System.out.println("Making some changes to table data"); Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("Id", new AttributeValue().withN("101")); item.put("Message", new AttributeValue().withS("New item!")); dynamoDBClient.putItem(tableName, item); numChanges++; // Update the item Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN("101")); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); attributeUpdates.put("Message", new AttributeValueUpdate().withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS("This item has changed"))); dynamoDBClient.updateItem(tableName, key, attributeUpdates); numChanges++; // Delete the item dynamoDBClient.deleteItem(tableName, key); numChanges++; // Get the shards in the stream DescribeStreamResult describeStreamResult = streamsClient .describeStream(new DescribeStreamRequest().withStreamArn(myStreamArn)); String streamArn = describeStreamResult.getStreamDescription().getStreamArn(); List<Shard> shards = describeStreamResult.getStreamDescription().getShards(); // Process each shard for (Shard shard : shards) { String shardId = shard.getShardId(); System.out.println("Processing " + shardId + " from stream " + streamArn); // Get an iterator for the current shard GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest().withStreamArn(myStreamArn) .withShardId(shardId).withShardIteratorType(ShardIteratorType.TRIM_HORIZON); GetShardIteratorResult getShardIteratorResult = streamsClient.getShardIterator(getShardIteratorRequest); String nextItr = getShardIteratorResult.getShardIterator(); while (nextItr != null && numChanges > 0) { // Use the iterator to read the data records from the shard GetRecordsResult getRecordsResult = streamsClient .getRecords(new GetRecordsRequest().withShardIterator(nextItr)); List<Record> records = getRecordsResult.getRecords(); System.out.println("Getting records..."); for (Record record : records) { System.out.println(record); numChanges--; } nextItr = getRecordsResult.getNextShardIterator(); } // Delete the table System.out.println("Deleting the table..."); dynamoDBClient.deleteTable(tableName); System.out.println("Demo complete"); } } }