Menu
Amazon DynamoDB
Developer Guide (API Version 2012-08-10)

Complete Program: DynamoDB Streams Kinesis Adapter

Here is the complete Java program that performs the tasks described in this walkthrough. When you run it, you should see output similar to the following:

Copy
Creating table KCL-Demo-src Creating table KCL-Demo-dest Table is active. Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601 Starting worker... Scan result is equal. Done.

Important

To run this program, make sure the client application has access to DynamoDB and CloudWatch using policies. For more information, see Using Identity-Based Policies (IAM Policies) for Amazon DynamoDB.

The source code consists of four .java files.

StreamsAdapterDemo.java

Copy
// Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package com.amazonaws.codesamples.gsg; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; public class StreamsAdapterDemo { private static Worker worker; private static KinesisClientLibConfiguration workerConfig; private static IRecordProcessorFactory recordProcessorFactory; private static AmazonDynamoDBStreamsAdapterClient adapterClient; private static AWSCredentialsProvider streamsCredentials; private static AmazonDynamoDBClient dynamoDBClient; private static AWSCredentialsProvider dynamoDBCredentials; private static AmazonCloudWatchClient cloudWatchClient; private static String serviceName = "dynamodb"; private static String dynamodbEndpoint = "DYNAMODB_ENDPOINT_GOES_HERE"; private static String streamsEndpoint = "STREAMS_ENDPOINT_GOES_HERE"; private static String tablePrefix = "KCL-Demo"; private static String streamArn; /** * @param args */ public static void main(String[] args) throws Exception { System.out.println("Starting demo..."); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamsCredentials = new ProfileCredentialsProvider(); dynamoDBCredentials = new ProfileCredentialsProvider(); recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBCredentials, dynamodbEndpoint, serviceName, destTable); /* ===== REQUIRED ===== * Users will have to explicitly instantiate and configure the adapter, then pass it to * the KCL worker. */ adapterClient = new AmazonDynamoDBStreamsAdapterClient(streamsCredentials, new ClientConfiguration()); adapterClient.setEndpoint(streamsEndpoint); dynamoDBClient = new AmazonDynamoDBClient(dynamoDBCredentials, new ClientConfiguration()); dynamoDBClient.setEndpoint(dynamodbEndpoint); cloudWatchClient = new AmazonCloudWatchClient(dynamoDBCredentials, new ClientConfiguration()); setUpTables(); workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo", streamArn, streamsCredentials, "streams-demo-worker") .withMaxRecords(1000) .withIdleTimeBetweenReadsInMillis(500) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); System.out.println("Creating worker for stream: " + streamArn); worker = new Worker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); System.out.println("Starting worker..."); Thread t = new Thread(worker); t.start(); Thread.sleep(25000); worker.shutdown(); t.join(); if(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems().equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); } private static void setUpTables() { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamArn = StreamsAdapterDemoHelper.createTable(dynamoDBClient, srcTable); StreamsAdapterDemoHelper.createTable(dynamoDBClient, destTable); awaitTableCreation(srcTable); performOps(srcTable); } private static void awaitTableCreation(String tableName) { Integer retries = 0; Boolean created = false; while(!created && retries < 100) { DescribeTableResult result = StreamsAdapterDemoHelper.describeTable(dynamoDBClient, tableName); created = result.getTable().getTableStatus().equals("ACTIVE"); if (created) { System.out.println("Table is active."); return; } else { retries++; try { Thread.sleep(1000); } catch(InterruptedException e) { // do nothing } } } System.out.println("Timeout after table creation. Exiting..."); cleanupAndExit(1); } private static void performOps(String tableName) { StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102"); } private static void cleanupAndExit(Integer returnValue) { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable)); System.exit(returnValue); } }

StreamsRecordProcessor.java

Copy
// Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package com.amazonaws.codesamples.gsg; import java.nio.charset.Charset; import java.util.List; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; public class StreamsRecordProcessor implements IRecordProcessor { private Integer checkpointCounter; private final AmazonDynamoDBClient dynamoDBClient; private final String tableName; public StreamsRecordProcessor(AmazonDynamoDBClient dynamoDBClient, String tableName) { this.dynamoDBClient = dynamoDBClient; this.tableName = tableName; } @Override public void initialize(String shardId) { checkpointCounter = 0; } @Override public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) { for(Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if(record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record).getInternalObject(); switch(streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if(checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch(Exception e) { e.printStackTrace(); } } } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if(reason == ShutdownReason.TERMINATE) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } }

StreamsRecordProcessorFactory.java

Copy
// Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package com.amazonaws.codesamples.gsg; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { private final AWSCredentialsProvider dynamoDBCredentials; private final String dynamoDBEndpoint; private final String tableName; public StreamsRecordProcessorFactory( AWSCredentialsProvider dynamoDBCredentials, String dynamoDBEndpoint, String serviceName, String tableName) { this.dynamoDBCredentials = dynamoDBCredentials; this.dynamoDBEndpoint = dynamoDBEndpoint; this.tableName = tableName; } @Override public IRecordProcessor createProcessor() { AmazonDynamoDBClient dynamoDBClient = new AmazonDynamoDBClient(dynamoDBCredentials, new ClientConfiguration()); dynamoDBClient.setEndpoint(dynamoDBEndpoint); return new StreamsRecordProcessor(dynamoDBClient, tableName); } }

StreamsAdapterDemoHelper.java

Copy
// Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. // Licensed under the Apache License, Version 2.0. package com.amazonaws.codesamples.gsg; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.model.AttributeAction; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; import com.amazonaws.services.dynamodbv2.model.CreateTableResult; import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.PutItemRequest; import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.amazonaws.services.dynamodbv2.model.ScanResult; import com.amazonaws.services.dynamodbv2.model.StreamSpecification; import com.amazonaws.services.dynamodbv2.model.StreamViewType; import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest; public class StreamsAdapterDemoHelper { /** * @return StreamArn */ public static String createTable(AmazonDynamoDBClient client, String tableName) { java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); //Partition key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput() .withReadCapacityUnits(2L).withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest() .withTableName(tableName) .withAttributeDefinitions(attributeDefinitions) .withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput) .withStreamSpecification(streamSpecification); try { System.out.println("Creating table " + tableName); CreateTableResult result = client.createTable(createTableRequest); return result.getTableDescription().getLatestStreamArn(); } catch(ResourceInUseException e) { System.out.println("Table already exists."); return describeTable(client, tableName).getTable().getLatestStreamArn(); } } public static DescribeTableResult describeTable(AmazonDynamoDBClient client, String tableName) { return client.describeTable(new DescribeTableRequest().withTableName(tableName)); } public static ScanResult scanTable(AmazonDynamoDBClient client, String tableName) { return client.scan(new ScanRequest().withTableName(tableName)); } public static void putItem(AmazonDynamoDBClient client, String tableName, String id, String val) { java.util.Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("Id", new AttributeValue().withN(id)); item.put("attribute-1", new AttributeValue().withS(val)); PutItemRequest putItemRequest = new PutItemRequest() .withTableName(tableName) .withItem(item); client.putItem(putItemRequest); } public static void putItem(AmazonDynamoDBClient client, String tableName, java.util.Map<String, AttributeValue> items) { PutItemRequest putItemRequest = new PutItemRequest() .withTableName(tableName) .withItem(items); client.putItem(putItemRequest); } public static void updateItem(AmazonDynamoDBClient client, String tableName, String id, String val) { java.util.Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN(id)); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); AttributeValueUpdate update = new AttributeValueUpdate() .withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS(val)); attributeUpdates.put("attribute-2", update); UpdateItemRequest updateItemRequest = new UpdateItemRequest() .withTableName(tableName) .withKey(key) .withAttributeUpdates(attributeUpdates); client.updateItem(updateItemRequest); } public static void deleteItem(AmazonDynamoDBClient client, String tableName, String id) { java.util.Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN(id)); DeleteItemRequest deleteItemRequest = new DeleteItemRequest() .withTableName(tableName) .withKey(key); client.deleteItem(deleteItemRequest); } }

On this page: