Complete program: DynamoDB Streams Kinesis adapter - Amazon DynamoDB

Complete program: DynamoDB Streams Kinesis adapter

The following is the complete Java program that performs the tasks described in Walkthrough: DynamoDB Streams Kinesis adapter. When you run it, you should see output similar to the following.

Creating table KCL-Demo-src Creating table KCL-Demo-dest Table is active. Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601 Starting worker... Scan result is equal. Done.
Important

To run this program, ensure that the client application has access to DynamoDB and Amazon CloudWatch using policies. For more information, see Identity-based policies for DynamoDB.

The source code consists of four .java files:

  • StreamsAdapterDemo.java

  • StreamsRecordProcessor.java

  • StreamsRecordProcessorFactory.java

  • StreamsAdapterDemoHelper.java

StreamsAdapterDemo.java

package com.amazonaws.codesamples; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.regions.Regions; import com.amazonaws.services.cloudwatch.AmazonCloudWatch; import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder; import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; public class StreamsAdapterDemo { private static Worker worker; private static KinesisClientLibConfiguration workerConfig; private static IRecordProcessorFactory recordProcessorFactory; private static AmazonDynamoDB dynamoDBClient; private static AmazonCloudWatch cloudWatchClient; private static AmazonDynamoDBStreams dynamoDBStreamsClient; private static AmazonDynamoDBStreamsAdapterClient adapterClient; private static String tablePrefix = "KCL-Demo"; private static String streamArn; private static Regions awsRegion = Regions.US_EAST_2; private static AWSCredentialsProvider awsCredentialsProvider = DefaultAWSCredentialsProviderChain.getInstance(); /** * @param args */ public static void main(String[] args) throws Exception { System.out.println("Starting demo..."); dynamoDBClient = AmazonDynamoDBClientBuilder.standard() .withRegion(awsRegion) .build(); cloudWatchClient = AmazonCloudWatchClientBuilder.standard() .withRegion(awsRegion) .build(); dynamoDBStreamsClient = AmazonDynamoDBStreamsClientBuilder.standard() .withRegion(awsRegion) .build(); adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBClient, destTable); setUpTables(); workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo", streamArn, awsCredentialsProvider, "streams-demo-worker") .withMaxRecords(1000) .withIdleTimeBetweenReadsInMillis(500) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); System.out.println("Creating worker for stream: " + streamArn); worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); System.out.println("Starting worker..."); Thread t = new Thread(worker); t.start(); Thread.sleep(25000); worker.shutdown(); t.join(); if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); } private static void setUpTables() { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamArn = StreamsAdapterDemoHelper.createTable(dynamoDBClient, srcTable); StreamsAdapterDemoHelper.createTable(dynamoDBClient, destTable); awaitTableCreation(srcTable); performOps(srcTable); } private static void awaitTableCreation(String tableName) { Integer retries = 0; Boolean created = false; while (!created && retries < 100) { DescribeTableResult result = StreamsAdapterDemoHelper.describeTable(dynamoDBClient, tableName); created = result.getTable().getTableStatus().equals("ACTIVE"); if (created) { System.out.println("Table is active."); return; } else { retries++; try { Thread.sleep(1000); } catch (InterruptedException e) { // do nothing } } } System.out.println("Timeout after table creation. Exiting..."); cleanupAndExit(1); } private static void performOps(String tableName) { StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102"); } private static void cleanupAndExit(Integer returnValue) { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable)); System.exit(returnValue); } }

StreamsRecordProcessor.java

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.model.Record; import java.nio.charset.Charset; public class StreamsRecordProcessor implements IRecordProcessor { private Integer checkpointCounter; private final AmazonDynamoDB dynamoDBClient; private final String tableName; public StreamsRecordProcessor(AmazonDynamoDB dynamoDBClient2, String tableName) { this.dynamoDBClient = dynamoDBClient2; this.tableName = tableName; } @Override public void initialize(InitializationInput initializationInput) { checkpointCounter = 0; } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { processRecordsInput.getCheckpointer().checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } }

StreamsRecordProcessorFactory.java

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { private final String tableName; private final AmazonDynamoDB dynamoDBClient; public StreamsRecordProcessorFactory(AmazonDynamoDB dynamoDBClient, String tableName) { this.tableName = tableName; this.dynamoDBClient = dynamoDBClient; } @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }

StreamsAdapterDemoHelper.java

package com.amazonaws.codesamples; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.AttributeAction; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; import com.amazonaws.services.dynamodbv2.model.CreateTableResult; import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.PutItemRequest; import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.amazonaws.services.dynamodbv2.model.ScanResult; import com.amazonaws.services.dynamodbv2.model.StreamSpecification; import com.amazonaws.services.dynamodbv2.model.StreamViewType; import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest; public class StreamsAdapterDemoHelper { /** * @return StreamArn */ public static String createTable(AmazonDynamoDB client, String tableName) { java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification); try { System.out.println("Creating table " + tableName); CreateTableResult result = client.createTable(createTableRequest); return result.getTableDescription().getLatestStreamArn(); } catch (ResourceInUseException e) { System.out.println("Table already exists."); return describeTable(client, tableName).getTable().getLatestStreamArn(); } } public static DescribeTableResult describeTable(AmazonDynamoDB client, String tableName) { return client.describeTable(new DescribeTableRequest().withTableName(tableName)); } public static ScanResult scanTable(AmazonDynamoDB dynamoDBClient, String tableName) { return dynamoDBClient.scan(new ScanRequest().withTableName(tableName)); } public static void putItem(AmazonDynamoDB dynamoDBClient, String tableName, String id, String val) { java.util.Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("Id", new AttributeValue().withN(id)); item.put("attribute-1", new AttributeValue().withS(val)); PutItemRequest putItemRequest = new PutItemRequest().withTableName(tableName).withItem(item); dynamoDBClient.putItem(putItemRequest); } public static void putItem(AmazonDynamoDB dynamoDBClient, String tableName, java.util.Map<String, AttributeValue> items) { PutItemRequest putItemRequest = new PutItemRequest().withTableName(tableName).withItem(items); dynamoDBClient.putItem(putItemRequest); } public static void updateItem(AmazonDynamoDB dynamoDBClient, String tableName, String id, String val) { java.util.Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN(id)); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); AttributeValueUpdate update = new AttributeValueUpdate().withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS(val)); attributeUpdates.put("attribute-2", update); UpdateItemRequest updateItemRequest = new UpdateItemRequest().withTableName(tableName).withKey(key) .withAttributeUpdates(attributeUpdates); dynamoDBClient.updateItem(updateItemRequest); } public static void deleteItem(AmazonDynamoDB dynamoDBClient, String tableName, String id) { java.util.Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN(id)); DeleteItemRequest deleteItemRequest = new DeleteItemRequest().withTableName(tableName).withKey(key); dynamoDBClient.deleteItem(deleteItemRequest); } }