Programme complet : adaptateur DynamoDB Streams Kinesis - Amazon DynamoDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Programme complet : adaptateur DynamoDB Streams Kinesis

Voici le programme Java complet qui effectue les tâches décrites dans Démonstration : adaptateur Kinesis DynamoDB Streams. Lorsque vous l'exécutez, vous devez visualiser une sortie similaire à ce qui suit.

Creating table KCL-Demo-src Creating table KCL-Demo-dest Table is active. Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601 Starting worker... Scan result is equal. Done.
Important

Pour exécuter ce programme, assurez-vous que l'application cliente a accès à DynamoDB et à CloudWatch Amazon à l'aide de politiques. Pour de plus amples informations, veuillez consulter Politiques basées sur l'identité pour DynamoDB.

Le code source se compose de quatre fichiers .java :

  • StreamsAdapterDemo.java

  • StreamsRecordProcessor.java

  • StreamsRecordProcessorFactory.java

  • StreamsAdapterDemoHelper.java

StreamsAdapterDemo.java

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.RetrievalConfig; public class StreamsAdapterDemo { private static DynamoDbAsyncClient dynamoDbAsyncClient; private static CloudWatchAsyncClient cloudWatchAsyncClient; private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient; private static String tablePrefix = "KCL-Demo"; private static String streamArn; private static Region region = Region.US_EAST_1; private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); public static void main( String[] args ) throws Exception { System.out.println("Starting demo..."); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); cloudWatchAsyncClient = CloudWatchAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; setUpTables(); StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); ShardRecordProcessorFactory shardRecordProcessorFactory = new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, "streams-adapter-demo", amazonDynamoDbStreamsAdapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, "streams-demo-worker", shardRecordProcessorFactory ); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); System.out.println("Creating scheduler for stream " + streamArn); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, amazonDynamoDbStreamsAdapterClient ); System.out.println("Starting scheduler..."); Thread t = new Thread(scheduler); t.start(); Thread.sleep(250000); System.out.println("Stopping scheduler..."); scheduler.shutdown(); t.join(); if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); } private static void setUpTables() { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable); StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable); awaitTableCreation(srcTable); performOps(srcTable); } private static void awaitTableCreation(String tableName) { Integer retries = 0; Boolean created = false; while (!created && retries < 100) { DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName); created = result.table().tableStatusAsString().equals("ACTIVE"); if (created) { System.out.println("Table is active."); return; } else { retries++; try { Thread.sleep(1000); } catch (InterruptedException e) { // do nothing } } } System.out.println("Timeout after table creation. Exiting..."); cleanupAndExit(1); } private static void performOps(String tableName) { StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102"); } private static void cleanupAndExit(Integer returnValue) { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build()); dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build()); System.exit(returnValue); } }

StreamsRecordProcessor.java

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { private Integer checkpointCounter; private final DynamoDbAsyncClient dynamoDbAsyncClient; private final String tableName; public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) { this.dynamoDbAsyncClient = dynamoDbAsyncClient; this.tableName = tableName; } @Override public void initialize(InitializationInput initializationInput) { this.checkpointCounter = 0; } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) { for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) { String data = new String(record.data().array(), StandardCharsets.UTF_8); System.out.println(data); Record streamRecord = record.getRecord(); switch (streamRecord.eventName()) { case INSERT: case MODIFY: StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, streamRecord.dynamodb().newImage()); case REMOVE: StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, streamRecord.dynamodb().keys().get("Id").n()); } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease Lost"); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { e.printStackTrace(); } } }

StreamsRecordProcessorFactory.java

package com.amazonaws.codesamples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory { private final String tableName; private final DynamoDbAsyncClient dynamoDbAsyncClient; public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) { this.tableName = tableName; this.dynamoDbAsyncClient = asyncClient; } @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName); } }

StreamsAdapterDemoHelper.java

package com.amazonaws.codesamples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.StreamSpecification; import software.amazon.awssdk.services.dynamodb.model.StreamViewType; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class StreamsAdapterDemoHelper { /** * @return StreamArn */ public static String createTable(DynamoDbAsyncClient client, String tableName) { List<AttributeDefinition> attributeDefinitions = new ArrayList<>(); attributeDefinitions.add(AttributeDefinition.builder() .attributeName("Id") .attributeType("N") .build()); List<KeySchemaElement> keySchema = new ArrayList<>(); keySchema.add(KeySchemaElement.builder() .attributeName("Id") .keyType(KeyType.HASH) // Partition key .build()); StreamSpecification streamSpecification = StreamSpecification.builder() .streamEnabled(true) .streamViewType(StreamViewType.NEW_IMAGE) .build(); CreateTableRequest createTableRequest = CreateTableRequest.builder() .tableName(tableName) .attributeDefinitions(attributeDefinitions) .keySchema(keySchema) .billingMode(BillingMode.PAY_PER_REQUEST) .streamSpecification(streamSpecification) .build(); try { System.out.println("Creating table " + tableName); CreateTableResponse result = client.createTable(createTableRequest).join(); return result.tableDescription().latestStreamArn(); } catch (Exception e) { if (e.getCause() instanceof ResourceInUseException) { System.out.println("Table already exists."); return describeTable(client, tableName).table().latestStreamArn(); } throw e; } } public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) { return client.describeTable(DescribeTableRequest.builder() .tableName(tableName) .build()) .join(); } public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) { return dynamoDbClient.scan(ScanRequest.builder() .tableName(tableName) .build()) .join(); } public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) { Map<String, AttributeValue> item = new HashMap<>(); item.put("Id", AttributeValue.builder().n(id).build()); item.put("attribute-1", AttributeValue.builder().s(val).build()); putItem(dynamoDbClient, tableName, item); } public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, Map<String, AttributeValue> items) { PutItemRequest putItemRequest = PutItemRequest.builder() .tableName(tableName) .item(items) .build(); dynamoDbClient.putItem(putItemRequest).join(); } public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) { Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", AttributeValue.builder().n(id).build()); Map<String, String> expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put("#attr2", "attribute-2"); Map<String, AttributeValue> expressionAttributeValues = new HashMap<>(); expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build()); UpdateItemRequest updateItemRequest = UpdateItemRequest.builder() .tableName(tableName) .key(key) .updateExpression("SET #attr2 = :val") .expressionAttributeNames(expressionAttributeNames) .expressionAttributeValues(expressionAttributeValues) .build(); dynamoDbClient.updateItem(updateItemRequest).join(); } public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) { Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", AttributeValue.builder().n(id).build()); DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder() .tableName(tableName) .key(key) .build(); dynamoDbClient.deleteItem(deleteItemRequest).join(); } }