Program lengkap: Adaptor DynamoDB Streams Kinesis - Amazon DynamoDB

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Program lengkap: Adaptor DynamoDB Streams Kinesis

Berikut ini adalah program Java lengkap yang melakukan tugas yang dijelaskan dalam Panduan: Adaptor DynamoDB Streams Kinesis. Saat Anda menjalankannya, Anda akan melihat output yang serupa dengan yang seperti berikut.

Creating table KCL-Demo-src Creating table KCL-Demo-dest Table is active. Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601 Starting worker... Scan result is equal. Done.
penting

Untuk menjalankan program ini, pastikan bahwa aplikasi klien memiliki akses ke DynamoDB dan CloudWatch Amazon menggunakan kebijakan. Untuk informasi selengkapnya, lihat Kebijakan berbasis identitas untuk DynamoDB.

Kode sumber terdiri dari empat file .java:

  • StreamsAdapterDemo.java

  • StreamsRecordProcessor.java

  • StreamsRecordProcessorFactory.java

  • StreamsAdapterDemoHelper.java

StreamsAdapterDemo.jawa

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.processor.StreamTracker; import software.amazon.kinesis.retrieval.RetrievalConfig; public class StreamsAdapterDemo { private static DynamoDbAsyncClient dynamoDbAsyncClient; private static CloudWatchAsyncClient cloudWatchAsyncClient; private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient; private static String tablePrefix = "KCL-Demo"; private static String streamArn; private static Region region = Region.US_EAST_1; private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); public static void main( String[] args ) throws Exception { System.out.println("Starting demo..."); dynamoDbAsyncClient = DynamoDbAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); cloudWatchAsyncClient = CloudWatchAsyncClient.builder() .credentialsProvider(credentialsProvider) .region(region) .build(); amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; setUpTables(); StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); ShardRecordProcessorFactory shardRecordProcessorFactory = new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable); ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, "streams-adapter-demo", amazonDynamoDbStreamsAdapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, "streams-demo-worker", shardRecordProcessorFactory ); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); System.out.println("Creating scheduler for stream " + streamArn); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, amazonDynamoDbStreamsAdapterClient ); System.out.println("Starting scheduler..."); Thread t = new Thread(scheduler); t.start(); Thread.sleep(250000); System.out.println("Stopping scheduler..."); scheduler.shutdown(); t.join(); if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); } private static void setUpTables() { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable); StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable); awaitTableCreation(srcTable); performOps(srcTable); } private static void awaitTableCreation(String tableName) { Integer retries = 0; Boolean created = false; while (!created && retries < 100) { DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName); created = result.table().tableStatusAsString().equals("ACTIVE"); if (created) { System.out.println("Table is active."); return; } else { retries++; try { Thread.sleep(1000); } catch (InterruptedException e) { // do nothing } } } System.out.println("Timeout after table creation. Exiting..."); cleanupAndExit(1); } private static void performOps(String tableName) { StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102"); } private static void cleanupAndExit(Integer returnValue) { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build()); dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build()); System.exit(returnValue); } }

StreamsRecordProcessor.jawa

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { private Integer checkpointCounter; private final DynamoDbAsyncClient dynamoDbAsyncClient; private final String tableName; public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) { this.dynamoDbAsyncClient = dynamoDbAsyncClient; this.tableName = tableName; } @Override public void initialize(InitializationInput initializationInput) { this.checkpointCounter = 0; } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) { for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) { String data = new String(record.data().array(), StandardCharsets.UTF_8); System.out.println(data); Record streamRecord = record.getRecord(); switch (streamRecord.eventName()) { case INSERT: case MODIFY: StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, streamRecord.dynamodb().newImage()); case REMOVE: StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, streamRecord.dynamodb().keys().get("Id").n()); } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { System.out.println("Lease Lost"); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { e.printStackTrace(); } } }

StreamsRecordProcessorFactory.jawa

package com.amazonaws.codesamples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory { private final String tableName; private final DynamoDbAsyncClient dynamoDbAsyncClient; public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) { this.tableName = tableName; this.dynamoDbAsyncClient = asyncClient; } @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName); } }

StreamsAdapterDemoHelper.jawa

package com.amazonaws.codesamples; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse; import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement; import software.amazon.awssdk.services.dynamodb.model.KeyType; import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput; import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.StreamSpecification; import software.amazon.awssdk.services.dynamodb.model.StreamViewType; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class StreamsAdapterDemoHelper { /** * @return StreamArn */ public static String createTable(DynamoDbAsyncClient client, String tableName) { List<AttributeDefinition> attributeDefinitions = new ArrayList<>(); attributeDefinitions.add(AttributeDefinition.builder() .attributeName("Id") .attributeType("N") .build()); List<KeySchemaElement> keySchema = new ArrayList<>(); keySchema.add(KeySchemaElement.builder() .attributeName("Id") .keyType(KeyType.HASH) // Partition key .build()); StreamSpecification streamSpecification = StreamSpecification.builder() .streamEnabled(true) .streamViewType(StreamViewType.NEW_IMAGE) .build(); CreateTableRequest createTableRequest = CreateTableRequest.builder() .tableName(tableName) .attributeDefinitions(attributeDefinitions) .keySchema(keySchema) .billingMode(BillingMode.PAY_PER_REQUEST) .streamSpecification(streamSpecification) .build(); try { System.out.println("Creating table " + tableName); CreateTableResponse result = client.createTable(createTableRequest).join(); return result.tableDescription().latestStreamArn(); } catch (Exception e) { if (e.getCause() instanceof ResourceInUseException) { System.out.println("Table already exists."); return describeTable(client, tableName).table().latestStreamArn(); } throw e; } } public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) { return client.describeTable(DescribeTableRequest.builder() .tableName(tableName) .build()) .join(); } public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) { return dynamoDbClient.scan(ScanRequest.builder() .tableName(tableName) .build()) .join(); } public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) { Map<String, AttributeValue> item = new HashMap<>(); item.put("Id", AttributeValue.builder().n(id).build()); item.put("attribute-1", AttributeValue.builder().s(val).build()); putItem(dynamoDbClient, tableName, item); } public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, Map<String, AttributeValue> items) { PutItemRequest putItemRequest = PutItemRequest.builder() .tableName(tableName) .item(items) .build(); dynamoDbClient.putItem(putItemRequest).join(); } public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) { Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", AttributeValue.builder().n(id).build()); Map<String, String> expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put("#attr2", "attribute-2"); Map<String, AttributeValue> expressionAttributeValues = new HashMap<>(); expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build()); UpdateItemRequest updateItemRequest = UpdateItemRequest.builder() .tableName(tableName) .key(key) .updateExpression("SET #attr2 = :val") .expressionAttributeNames(expressionAttributeNames) .expressionAttributeValues(expressionAttributeValues) .build(); dynamoDbClient.updateItem(updateItemRequest).join(); } public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) { Map<String, AttributeValue> key = new HashMap<>(); key.put("Id", AttributeValue.builder().n(id).build()); DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder() .tableName(tableName) .key(key) .build(); dynamoDbClient.deleteItem(deleteItemRequest).join(); } }