Amazon DynamoDB
Developer Guide (API Version 2012-08-10)

Walkthrough: DynamoDB Streams Kinesis Adapter

This section is a walkthrough of a Java application that uses the Kinesis Client Library and the DynamoDB Streams Kinesis Adapter. The application shows an example of data replication, where write activity from one table is applied to a second table, with both tables' contents staying in sync. For the source code, see Complete Program: DynamoDB Streams Kinesis Adapter.

The program does the following:

  1. Creates two DynamoDB tables named KCL-Demo-src and KCL-Demo-dst. Each of these tables has a stream enabled on it.

  2. Generates update activity in the source table by adding, updating, and deleting items. This causes data to be written to the table's stream.

  3. Reads the records from the stream, reconstructs them as DynamoDB requests, and applies the requests to the destination table.

  4. Scans the source and destination tables to ensure their contents are identical.

  5. Cleans up by deleting the tables.

These steps are described in the following sections, and the complete application is shown at the end of the walkthrough.

Step 1: Create DynamoDB Tables

The first step is to create two DynamoDB tables—a source table and a destination table. The StreamViewType on the source table's stream is NEW_IMAGE, meaning that whenever an item is modified in this table, the item's "after" image is written to the stream. In this way, the stream keeps track of all write activity on the table.

The following code snippet shows the code used for creating both tables.

java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

Step 2: Generate Update Activity in Source Table

The next step is to generate some write activity on the source table. While this activity is taking place, the source table's stream is also updated in near real time.

The application defines a helper class with methods that call the PutItem, UpdateItem, and DeleteItem API actions for writing the data. The following code snippet shows how these methods are used.

StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");

Step 3: Process the Stream

Now the program begins processing the stream. The DynamoDB Streams Kinesis Adapter acts as a transparent layer between the KCL and the DynamoDB Streams endpoint, so that the code can fully leverage KCL rather than having to make low-level DynamoDB Streams calls. The program performs the following tasks:

  • It defines a record processor class, StreamsRecordProcessor, with methods that comply with the KCL interface definition: initialize, processRecords, and shutdown. The processRecords method contains the logic required for reading from the source table's stream and writing to the destination table.

  • It defines a class factory for the record processor class (StreamsRecordProcessorFactory). This is required for Java programs that use the KCL.

  • It instantiates a new KCL Worker, which is associated with the class factory.

  • It shuts down the Worker when record processing is complete.

To learn more about the KCL interface definition, go to Developing Amazon Kinesis Consumers Using the Amazon Kinesis Client Library in the Amazon Kinesis Developer Guide.

The following code snippet shows the main loop in StreamsRecordProcessor. The case statement determines what action to perform, based on the OperationType that appears in the stream record.

for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }

Step 4: Ensure Both Tables Have Identical Contents

At this point, the source and destination tables' contents are in sync. The application issues Scan requests against both tables to verify that their contents are, in fact, identical.

The DemoHelper class contains a ScanTable method that calls the low-level Scan API. The following code snippet shows how this is used.

if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }

Step 5: Clean Up

The demo is complete, so the application deletes the source and destination tables. See the following code snippet.

Even after the tables are deleted, their streams remain available for up to 24 hours, after which they are automatically deleted.

dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));