Démonstration : adaptateur Kinesis DynamoDB Streams - Amazon DynamoDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Démonstration : adaptateur Kinesis DynamoDB Streams

Cette section est une démonstration d'une application Java qui utilise la bibliothèque client Amazon Kinesis et l'adaptateur Amazon DynamoDB Streams. L'application illustre un exemple de la réplication de données, où l'activité d'écriture d'une table est appliquée à une seconde table, avec le contenu des deux tables demeurant synchronisé. Pour le code source, consultez Programme complet : adaptateur DynamoDB Streams Kinesis.

Le programme exécute les tâches suivantes :

  1. Crée deux tables DynamoDB nommées KCL-Demo-src et KCL-Demo-dst. Chacune de ces tables dispose d'un flux activé sur elle-même.

  2. Génère une activité de mise à jour de la table source en ajoutant, mettant à jour et supprimant des éléments. Cela entraîne l'écriture des données sur le flux de la table.

  3. Lit les registres du flux, les reconstruit en tant que demandes DynamoDB, et applique les demandes à la table de destination.

  4. Analyse les tables source et de destination afin de s'assurer que leurs contenus sont identiques.

  5. Elimine en supprimant les tables.

Ces étapes sont décrites dans les sections suivantes et l'application complète est illustrée à la fin de la procédure pas à pas.

Étape 1 : créer des tables DynamoDB

La première étape consiste à créer deux tables DynamoDB : une table source et une table de destination. Le StreamViewType sur le flux de la table source est NEW_IMAGE. Cela signifie que chaque fois qu'un élément est modifié dans la table, l'image « après » de l'élément est écrite dans le flux. De cette manière, le flux assure le suivi des toute l'activité d'écriture sur la table.

L'extrait de code suivant illustre le code utilisé pour la création des deux tables.

java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

Étape 2 : générer une activité de mise à jour de la table source

L'étape suivante consiste à créer une activité d'écriture sur la table source. Tandis que cette activité a lieu, le flux de la table source est aussi mis à jour en quasi-temps réel.

L'application définit une classe d'assistance avec les méthodes qui appellent les actions d'API PutItem, UpdateItem et DeleteItem pour écrire les données. L'extrait de code suivant montre comment ces méthodes sont utilisées.

StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");

Étape 3 : traiter le flux

Maintenant le programme commence à traiter le flux. L'adaptateur DynamoDB Streams Kinesis agit comme une couche transparente entre la KCL et le point de terminaison DynamoDB Streams, afin que le code puisse pleinement exploiter la KCL au lieu de devoir effectuer des appels DynamoDB Streams de bas niveau. Le programme effectue les tâches suivantes :

  • Il définit une classe de processeur d'enregistrements, StreamsRecordProcessor, avec des méthodes conformes à la définition de l'interface KCL : initialize, processRecords et shutdown. La méthode processRecords contient la logique nécessaire pour lire à partir du flux de la table source et écrire dans la table de destination.

  • Il définit une fabrique de classe pour la classe de processeur d'enregistrements (StreamsRecordProcessorFactory). Cette action est obligatoire pour les programmes Java qui utilisent le KCL.

  • Il instancie un nouveau KCL Worker, associé à la fabrique de classe.

  • Il arrête le Worker lorsque le traitement des enregistrements est terminé.

Pour en savoir plus sur la définition de l'interface de la KCL, consultez Développement d'applications consommateur à l'aide de la bibliothèque client Kinesis dans le Guide du développeur Amazon Kinesis Data Streams.

L'extrait de code suivant illustre la boucle principale dans StreamsRecordProcessor. L'instruction case détermine l'action à exécuter, en fonction de l'OperationType qui s'affiche dans l'enregistrement de flux.

for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }

Étape 4 : s'assurer que les deux tables ont un contenu identique

À ce stade, le contenu des tables source et destination est synchronisé. L'application émet des demandes Scan sur les deux tables afin de vérifier que leurs contenus sont, en fait, identiques.

La classe DemoHelper contient une méthode ScanTable qui appelle l'API Scan de bas niveau. L'exemple suivant illustre la marche à suivre.

if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }

Étape 5 : nettoyer

Comme la démonstration est terminée, l'application supprime les tables source et destination. Consultez l'exemple de code suivant. Même après que les tables sont supprimées, leurs flux demeurent accessibles 24 heures, délai au-delà duquel ils sont automatiquement supprimés.

dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));