Tutorial: Adaptador Kinesis de DynamoDB Streams
En esta sección se explica paso a paso una aplicación Java en la que se utiliza Amazon Kinesis Client Library y Amazon DynamoDB Streams Kinesis Adapter. En la aplicación se muestra un ejemplo de replicación de datos, donde la actividad de escritura de una tabla se aplica a una segunda tabla, de tal forma que el contenido de ambas se mantiene sincronizado. Para obtener el código fuente, consulte Programa completo: DynamoDB Streams Kinesis Adapter.
El programa realiza lo siguiente:
-
Crea dos tablas de DynamoDB denominadas
KCL-Demo-src
yKCL-Demo-dst
. En cada una de estas tablas se ha habilitado una secuencia. -
Agrega, actualiza y elimina elementos para generar actividad de actualización en la tabla de origen. Esto hace que se escriban datos en la secuencia de la tabla.
-
Lee los registros en la transmisión, los reconstruye como solicitudes de DynamoDB y aplica las solicitudes a la tabla de destino.
-
Examina las tablas de origen y destino para comprobar que sus contenidos sean idénticos.
-
Efectúa una limpieza eliminando las tablas.
Estos pasos se describen en las siguientes secciones y la aplicación completa se muestra al final del tutorial.
Temas
Paso 1: crear tablas de DynamoDB
El primer paso consiste en crear dos tablas de DynamoDB, una de origen y una de destino. El StreamViewType
de la secuencia de la tabla de origen es NEW_IMAGE
. Esto significa que cada vez que se modifica un elemento en esta tabla, su imagen de "después" se escribe en la secuencia. De esta forma, se realiza un seguimiento en la secuencia de todas las actividades de escritura en la tabla.
En el siguiente ejemplo se muestra el código utilizado para crear las dos tablas.
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
Paso 2: generar actividad de actualización en la tabla de origen
El siguiente paso consiste en generar actividad de escritura en la tabla de origen. Mientras tiene lugar esta actividad, la secuencia de la tabla de origen también se actualiza casi en tiempo real.
En la aplicación se define una clase auxiliar con métodos que llaman a las operaciones de API PutItem
, UpdateItem
y DeleteItem
para escribir los datos. En el siguiente ejemplo se muestra cómo se utilizan estos métodos.
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
Paso 3: procesar la secuencia
Ahora, el programa comienza a procesar la secuencia. DynamoDB Streams Kinesis Adapter actúa como una capa transparente entre la KCL y el punto de enlace de DynamoDB Streams, para que el código pueda utilizar plenamente la KCL, en lugar de tener que realizar llamadas a DynamoDB Streams de bajo nivel. En el programa se realizan las siguientes tareas:
-
Se define una clase de procesador de registros,
StreamsRecordProcessor
, con métodos que cumplen con la definición de interfaz de KCL:initialize
,processRecords
yshutdown
. El métodoprocessRecords
contiene la lógica necesaria para leer la secuencia de la tabla de origen y escribir en la tabla de destino. -
Define un generador de clases para la clase de procesador de registros (
StreamsRecordProcessorFactory
). Esto es necesario para los programas Java que utilizan la KCL. -
Crea una nueva instancia del proceso de trabajo
Worker
de la KCL, asociado con el generador de clases. -
Cierra el proceso de trabajo
Worker
cuando ha finalizado de procesar registros.
Para obtener más información sobre la definición de la interfaz de KCL, consulte Desarrollo de consumidores mediante la biblioteca Kinesis Client Library en la Guía de desarrolladores de Amazon Kinesis Data Streams.
En el siguiente ejemplo se muestra el bucle principal de StreamsRecordProcessor
. La instrucción case
determina qué acción se debe llevar a cabo, según el valor de OperationType
que aparece en el registro de secuencia.
for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
Paso 4: comprobar que el contenido de ambas tablas es idéntico
En este punto, el contenido de las tablas de origen y destino está sincronizado. La aplicación emite solicitudes Scan
en las dos tablas para comprobar que su contenido sea realmente idéntico.
La clase DemoHelper
contiene un método ScanTable
que llama a la API de bajo nivel Scan
. El siguiente ejemplo le muestra cómo se usa.
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }
Paso 5: Eliminar
La demostración ha finalizado. Por consiguiente, la aplicación elimina las tablas de origen y destino. Consulte el siguiente ejemplo de código. Incluso después de que las tablas se hayan eliminado, sus secuencias permanecerán disponibles durante un máximo de 24 horas; transcurrido este periodo se eliminan automáticamente.
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));