Programa completo: DynamoDB Streams Kinesis Adapter - Amazon DynamoDB

Programa completo: DynamoDB Streams Kinesis Adapter

Veja a seguir o programa Java completo que realiza as tarefas descritas em Demonstração: DynamoDB Streams Kinesis Adapter. Quando executá-lo, você verá uma saída semelhante à seguinte:

Creating table KCL-Demo-src Creating table KCL-Demo-dest Table is active. Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601 Starting worker... Scan result is equal. Done.
Importante

Para executar esse programa, verifique se a aplicação cliente tem acesso ao DynamoDB e ao Amazon CloudWatch usando políticas. Para obter mais informações, consulte Políticas baseadas em identidade para o DynamoDB.

O código-fonte consiste em quatro arquivos .java:

  • StreamsAdapterDemo.java

  • StreamsRecordProcessor.java

  • StreamsRecordProcessorFactory.java

  • StreamsAdapterDemoHelper.java

StreamsAdapterDemo.java

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.RetrievalConfig; public class StreamsAdapterDemo { private static DynamoDbAsyncClient dynamoDbAsyncClient; private static CloudWatchAsyncClient cloudWatchAsyncClient; private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient; private static String tablePrefix = "KCL-Demo"; private static String streamArn; private static Region region = Region.US_EAST_1; private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); public static void main( String[] args ) throws Exception { System.out.println("Starting demo..."); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); cloudWatchAsyncClient = CloudWatchAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; setUpTables(); StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); ShardRecordProcessorFactory shardRecordProcessorFactory = new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, "streams-adapter-demo", amazonDynamoDbStreamsAdapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, "streams-demo-worker", shardRecordProcessorFactory ); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); System.out.println("Creating scheduler for stream " + streamArn); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, amazonDynamoDbStreamsAdapterClient ); System.out.println("Starting scheduler..."); Thread t = new Thread(scheduler); t.start(); Thread.sleep(250000); System.out.println("Stopping scheduler..."); scheduler.shutdown(); t.join(); if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); } private static void setUpTables() { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable); StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable); awaitTableCreation(srcTable); performOps(srcTable); } private static void awaitTableCreation(String tableName) { Integer retries = 0; Boolean created = false; while (!created && retries < 100) { DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName); created = result.table().tableStatusAsString().equals("ACTIVE"); if (created) { System.out.println("Table is active."); return; } else { retries++; try { Thread.sleep(1000); } catch (InterruptedException e) { // do nothing } } } System.out.println("Timeout after table creation. Exiting..."); cleanupAndExit(1); } private static void performOps(String tableName) { StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102"); } private static void cleanupAndExit(Integer returnValue) { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build()); dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build()); System.exit(returnValue); } }

StreamsRecordProcessor.java

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { private Integer checkpointCounter; private final DynamoDbAsyncClient dynamoDbAsyncClient; private final String tableName; public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) { this.dynamoDbAsyncClient = dynamoDbAsyncClient; this.tableName = tableName; } @Override public void initialize(InitializationInput initializationInput) { this.checkpointCounter = 0; } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) { for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) { String data = new String(record.data().array(), StandardCharsets.UTF_8); System.out.println(data); Record streamRecord = record.getRecord(); switch (streamRecord.eventName()) { case INSERT: case MODIFY: StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, streamRecord.dynamodb().newImage()); case REMOVE: StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, streamRecord.dynamodb().keys().get("Id").n()); } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease Lost"); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { e.printStackTrace(); } } }

StreamsRecordProcessorFactory.java

package com.amazonaws.codesamples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory { private final String tableName; private final DynamoDbAsyncClient dynamoDbAsyncClient; public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) { this.tableName = tableName; this.dynamoDbAsyncClient = asyncClient; } @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName); } }

StreamsAdapterDemoHelper.java

package com.amazonaws.codesamples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.StreamSpecification; import software.amazon.awssdk.services.dynamodb.model.StreamViewType; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class StreamsAdapterDemoHelper { /** * @return StreamArn */ public static String createTable(DynamoDbAsyncClient client, String tableName) { List<AttributeDefinition> attributeDefinitions = new ArrayList<>(); attributeDefinitions.add(AttributeDefinition.builder() .attributeName("Id") .attributeType("N") .build()); List<KeySchemaElement> keySchema = new ArrayList<>(); keySchema.add(KeySchemaElement.builder() .attributeName("Id") .keyType(KeyType.HASH) // Partition key .build()); StreamSpecification streamSpecification = StreamSpecification.builder() .streamEnabled(true) .streamViewType(StreamViewType.NEW_IMAGE) .build(); CreateTableRequest createTableRequest = CreateTableRequest.builder() .tableName(tableName) .attributeDefinitions(attributeDefinitions) .keySchema(keySchema) .billingMode(BillingMode.PAY_PER_REQUEST) .streamSpecification(streamSpecification) .build(); try { System.out.println("Creating table " + tableName); CreateTableResponse result = client.createTable(createTableRequest).join(); return result.tableDescription().latestStreamArn(); } catch (Exception e) { if (e.getCause() instanceof ResourceInUseException) { System.out.println("Table already exists."); return describeTable(client, tableName).table().latestStreamArn(); } throw e; } } public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) { return client.describeTable(DescribeTableRequest.builder() .tableName(tableName) .build()) .join(); } public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) { return dynamoDbClient.scan(ScanRequest.builder() .tableName(tableName) .build()) .join(); } public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) { Map<String, AttributeValue> item = new HashMap<>(); item.put("Id", AttributeValue.builder().n(id).build()); item.put("attribute-1", AttributeValue.builder().s(val).build()); putItem(dynamoDbClient, tableName, item); } public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, Map<String, AttributeValue> items) { PutItemRequest putItemRequest = PutItemRequest.builder() .tableName(tableName) .item(items) .build(); dynamoDbClient.putItem(putItemRequest).join(); } public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) { Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", AttributeValue.builder().n(id).build()); Map<String, String> expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put("#attr2", "attribute-2"); Map<String, AttributeValue> expressionAttributeValues = new HashMap<>(); expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build()); UpdateItemRequest updateItemRequest = UpdateItemRequest.builder() .tableName(tableName) .key(key) .updateExpression("SET #attr2 = :val") .expressionAttributeNames(expressionAttributeNames) .expressionAttributeValues(expressionAttributeValues) .build(); dynamoDbClient.updateItem(updateItemRequest).join(); } public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) { Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", AttributeValue.builder().n(id).build()); DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder() .tableName(tableName) .key(key) .build(); dynamoDbClient.deleteItem(deleteItemRequest).join(); } }