Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter
Hier finden Sie das vollständige Java-Programm, das die in Walkthrough: DynamoDB-Streams-Kinesis-Adapter beschriebenen Aufgaben durchführt. Wenn Sie das Programm ausführen, wird eine Ausgabe ähnlich der folgenden angezeigt:
Creating table KCL-Demo-src Creating table KCL-Demo-dest Table is active. Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601 Starting worker... Scan result is equal. Done.
Wichtig
Um dieses Programm auszuführen, stellen Sie sicher, dass die Client-Anwendung CloudWatch mithilfe von Richtlinien Zugriff auf DynamoDB und Amazon hat. Weitere Informationen finden Sie unter Identitätsbasierte Richtlinien für DynamoDB.
Der Quellcode besteht aus vier .java
-Dateien:
-
StreamsAdapterDemo.java
-
StreamsRecordProcessor.java
-
StreamsRecordProcessorFactory.java
-
StreamsAdapterDemoHelper.java
StreamsAdapterDemo.java
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.RetrievalConfig; public class StreamsAdapterDemo { private static DynamoDbAsyncClient dynamoDbAsyncClient; private static CloudWatchAsyncClient cloudWatchAsyncClient; private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient; private static String tablePrefix = "KCL-Demo"; private static String streamArn; private static Region region = Region.US_EAST_1; private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); public static void main( String[] args ) throws Exception { System.out.println("Starting demo..."); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); cloudWatchAsyncClient = CloudWatchAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; setUpTables(); StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); ShardRecordProcessorFactory shardRecordProcessorFactory = new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, "streams-adapter-demo", amazonDynamoDbStreamsAdapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, "streams-demo-worker", shardRecordProcessorFactory ); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); System.out.println("Creating scheduler for stream " + streamArn); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, amazonDynamoDbStreamsAdapterClient ); System.out.println("Starting scheduler..."); Thread t = new Thread(scheduler); t.start(); Thread.sleep(250000); System.out.println("Stopping scheduler..."); scheduler.shutdown(); t.join(); if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); } private static void setUpTables() { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable); StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable); awaitTableCreation(srcTable); performOps(srcTable); } private static void awaitTableCreation(String tableName) { Integer retries = 0; Boolean created = false; while (!created && retries < 100) { DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName); created = result.table().tableStatusAsString().equals("ACTIVE"); if (created) { System.out.println("Table is active."); return; } else { retries++; try { Thread.sleep(1000); } catch (InterruptedException e) { // do nothing } } } System.out.println("Timeout after table creation. Exiting..."); cleanupAndExit(1); } private static void performOps(String tableName) { StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102"); } private static void cleanupAndExit(Integer returnValue) { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build()); dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build()); System.exit(returnValue); } }
StreamsRecordProcessor.java
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { private Integer checkpointCounter; private final DynamoDbAsyncClient dynamoDbAsyncClient; private final String tableName; public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) { this.dynamoDbAsyncClient = dynamoDbAsyncClient; this.tableName = tableName; } @Override public void initialize(InitializationInput initializationInput) { this.checkpointCounter = 0; } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) { for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) { String data = new String(record.data().array(), StandardCharsets.UTF_8); System.out.println(data); Record streamRecord = record.getRecord(); switch (streamRecord.eventName()) { case INSERT: case MODIFY: StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, streamRecord.dynamodb().newImage()); case REMOVE: StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, streamRecord.dynamodb().keys().get("Id").n()); } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease Lost"); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { e.printStackTrace(); } } }
StreamsRecordProcessorFactory.java
package com.amazonaws.codesamples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory { private final String tableName; private final DynamoDbAsyncClient dynamoDbAsyncClient; public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) { this.tableName = tableName; this.dynamoDbAsyncClient = asyncClient; } @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName); } }
StreamsAdapterDemoHelper.java
package com.amazonaws.codesamples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.StreamSpecification; import software.amazon.awssdk.services.dynamodb.model.StreamViewType; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class StreamsAdapterDemoHelper { /** * @return StreamArn */ public static String createTable(DynamoDbAsyncClient client, String tableName) { List<AttributeDefinition> attributeDefinitions = new ArrayList<>(); attributeDefinitions.add(AttributeDefinition.builder() .attributeName("Id") .attributeType("N") .build()); List<KeySchemaElement> keySchema = new ArrayList<>(); keySchema.add(KeySchemaElement.builder() .attributeName("Id") .keyType(KeyType.HASH) // Partition key .build()); StreamSpecification streamSpecification = StreamSpecification.builder() .streamEnabled(true) .streamViewType(StreamViewType.NEW_IMAGE) .build(); CreateTableRequest createTableRequest = CreateTableRequest.builder() .tableName(tableName) .attributeDefinitions(attributeDefinitions) .keySchema(keySchema) .billingMode(BillingMode.PAY_PER_REQUEST) .streamSpecification(streamSpecification) .build(); try { System.out.println("Creating table " + tableName); CreateTableResponse result = client.createTable(createTableRequest).join(); return result.tableDescription().latestStreamArn(); } catch (Exception e) { if (e.getCause() instanceof ResourceInUseException) { System.out.println("Table already exists."); return describeTable(client, tableName).table().latestStreamArn(); } throw e; } } public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) { return client.describeTable(DescribeTableRequest.builder() .tableName(tableName) .build()) .join(); } public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) { return dynamoDbClient.scan(ScanRequest.builder() .tableName(tableName) .build()) .join(); } public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) { Map<String, AttributeValue> item = new HashMap<>(); item.put("Id", AttributeValue.builder().n(id).build()); item.put("attribute-1", AttributeValue.builder().s(val).build()); putItem(dynamoDbClient, tableName, item); } public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, Map<String, AttributeValue> items) { PutItemRequest putItemRequest = PutItemRequest.builder() .tableName(tableName) .item(items) .build(); dynamoDbClient.putItem(putItemRequest).join(); } public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) { Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", AttributeValue.builder().n(id).build()); Map<String, String> expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put("#attr2", "attribute-2"); Map<String, AttributeValue> expressionAttributeValues = new HashMap<>(); expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build()); UpdateItemRequest updateItemRequest = UpdateItemRequest.builder() .tableName(tableName) .key(key) .updateExpression("SET #attr2 = :val") .expressionAttributeNames(expressionAttributeNames) .expressionAttributeValues(expressionAttributeValues) .build(); dynamoDbClient.updateItem(updateItemRequest).join(); } public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) { Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", AttributeValue.builder().n(id).build()); DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder() .tableName(tableName) .key(key) .build(); dynamoDbClient.deleteItem(deleteItemRequest).join(); } }