Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Démonstration : adaptateur Kinesis DynamoDB Streams
Cette section est une démonstration d'une application Java qui utilise la bibliothèque client Amazon Kinesis et l'adaptateur Amazon DynamoDB Streams. L'application illustre un exemple de la réplication de données, où l'activité d'écriture d'une table est appliquée à une seconde table, avec le contenu des deux tables demeurant synchronisé. Pour le code source, consultez Programme complet : adaptateur DynamoDB Streams Kinesis.
Le programme exécute les tâches suivantes :
-
Crée deux tables DynamoDB nommées
KCL-Demo-src
etKCL-Demo-dst
. Chacune de ces tables dispose d'un flux activé sur elle-même. -
Génère une activité de mise à jour de la table source en ajoutant, mettant à jour et supprimant des éléments. Cela entraîne l'écriture des données sur le flux de la table.
-
Lit les registres du flux, les reconstruit en tant que demandes DynamoDB, et applique les demandes à la table de destination.
-
Analyse les tables source et de destination afin de s'assurer que leurs contenus sont identiques.
-
Elimine en supprimant les tables.
Ces étapes sont décrites dans les sections suivantes et l'application complète est illustrée à la fin de la procédure pas à pas.
Rubriques
Étape 1 : créer des tables DynamoDB
La première étape consiste à créer deux tables DynamoDB : une table source et une table de destination. Le StreamViewType
sur le flux de la table source est NEW_IMAGE
. Cela signifie que chaque fois qu'un élément est modifié dans la table, l'image « après » de l'élément est écrite dans le flux. De cette manière, le flux assure le suivi des toute l'activité d'écriture sur la table.
L'extrait de code suivant illustre le code utilisé pour la création des deux tables.
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
Étape 2 : générer une activité de mise à jour de la table source
L'étape suivante consiste à créer une activité d'écriture sur la table source. Tandis que cette activité a lieu, le flux de la table source est aussi mis à jour en quasi-temps réel.
L'application définit une classe d'assistance avec des méthodes qui appellent le PutItem
UpdateItem
, et DeleteItem
API des opérations pour écrire les données. L'extrait de code suivant montre comment ces méthodes sont utilisées.
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
Étape 3 : traiter le flux
Maintenant le programme commence à traiter le flux. L'adaptateur DynamoDB Streams Kinesis agit comme une couche transparente entre le point de terminaison DynamoDB Streams et KCL le point de terminaison DynamoDB Streams, de sorte que le code peut être pleinement utilisé au lieu d'avoir à effectuer des appels DynamoDB Streams KCL de bas niveau. Le programme effectue les tâches suivantes :
-
Il définit une classe de processeur d'enregistrement
StreamsRecordProcessor
, avec des méthodes conformes à la définition de l'KCLinterface :initialize
processRecords
, etshutdown
. La méthodeprocessRecords
contient la logique nécessaire pour lire à partir du flux de la table source et écrire dans la table de destination. -
Il définit une fabrique de classe pour la classe de processeur d'enregistrements (
StreamsRecordProcessorFactory
). Cela est nécessaire pour les programmes Java qui utilisent leKCL. -
Il instancie un nouveau KCL
Worker
, qui est associé à la classe factory. -
Il arrête le
Worker
lorsque le traitement des enregistrements est terminé.
Pour en savoir plus sur la définition de l'KCLinterface, consultez la section Developing consumers using the Kinesis Client Library dans le manuel Amazon Kinesis Data Streams Developer Guide.
L'extrait de code suivant illustre la boucle principale dans StreamsRecordProcessor
. L'instruction case
détermine l'action à exécuter, en fonction de l'OperationType
qui s'affiche dans l'enregistrement de flux.
for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
Étape 4 : s'assurer que les deux tables ont un contenu identique
À ce stade, le contenu des tables source et destination est synchronisé. L'application émet des demandes Scan
sur les deux tables afin de vérifier que leurs contenus sont, en fait, identiques.
La DemoHelper
classe contient une ScanTable
méthode qui appelle le bas niveau. Scan
API L'exemple suivant illustre la marche à suivre.
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }
Étape 5 : nettoyer
Comme la démonstration est terminée, l'application supprime les tables source et destination. Consultez l'exemple de code suivant. Même après que les tables sont supprimées, leurs flux demeurent accessibles 24 heures, délai au-delà duquel ils sont automatiquement supprimés.
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));