Walkthrough: DynamoDB-Streams-Kinesis-Adapter - Amazon-DynamoDB

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Walkthrough: DynamoDB-Streams-Kinesis-Adapter

In diesem Abschnitt wird eine Anleitung für eine Java-Anwendung gegeben, die die Amazon-Kinesis-Client-Library und den Amazon-DynamoDB-Streams-Kinesis-Adapter verwendet. Die Anwendung zeigt ein Beispiel für die Datenreplikation, wobei Schreibaktivitäten einer Tabelle auf eine zweite Tabelle angewendet werden und die Inhalte beider Tabellen synchron bleiben. Sie finden den Quellcode unter Vollständiges Programm: DynamoDB-Streams-Kinesis-Adapter.

Das Programm führt Folgendes aus:

  1. Erstellt zwei DynamoDB-Tabellen namens KCL-Demo-src und KCL-Demo-dst. Für jede dieser Tabellen ist ein Stream aktiviert.

  2. Generiert Aktualisierungsaktivitäten in der Quelltabelle durch Hinzufügen, Aktualisieren und Löschen von Elementen. Dies bewirkt, dass Daten in den Tabellenstream geschrieben werden.

  3. Liest die Datensätze aus dem Stream, rekonstruiert diese als DynamoDB-Anforderungen und wendet die Anforderungen auf die Zieltabelle an.

  4. Scannt die Quell- und Zieltabellen, um sicherzustellen, dass ihre Inhalte identisch sind.

  5. Bereinigt die Daten durch Löschen der Tabellen.

Diese Schritte werden in den folgenden Abschnitten beschrieben und die vollständige Anwendung wird am Ende der Anleitung angezeigt.

Schritt 1: Erstellen einer DynamoDB-Tabelle

Im ersten Schritt erstellen Sie zwei DynamoDB-Tabellen—eine Quelltabelle und eine Zieltabelle. Der StreamViewType des Streams der Quelltabelle lautet NEW_IMAGE. Das bedeutet, dass sobald ein Element in dieser Tabelle geändert wird, das Image des Elements nach der Änderung in den Stream geschrieben wird. So verfolgt der Stream alle Schreibaktivitäten der Tabelle.

Das folgende Beispiel zeigt den Code für das Erstellen von beiden Tabellen.

java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

Schritt 2: Generieren von Aktualisierungsaktivitäten in der Quelltabelle

Im nächsten Schritt erstellen Sie einige Schreibaktivitäten in der Quelltabelle. Während diese Aktivitäten ausgeführt werden, wird der Stream der Quelltabelle nahezu in Echtzeit ebenfalls aktualisiert.

Die Anwendung definiert eine Hilfsklasse mit Methoden, die die DeleteItem API Operationen PutItemUpdateItem, und zum Schreiben der Daten aufrufen. Das folgende Codebeispiel zeigt, wie diese Methoden verwendet werden.

StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");

Schritt 3: Verarbeiten des Streams

Das Programm beginnt mit der Verarbeitung des Streams. Der DynamoDB Streams Kinesis Adapter fungiert als transparente Ebene zwischen dem DynamoDB Streams-Endpunkt KCL und dem DynamoDB Streams Streams-Endpunkt, sodass der Code vollständig genutzt werden kann, KCL anstatt DynamoDB Streams Streams-Aufrufe auf niedriger Ebene tätigen zu müssen. Das Programm führt die folgenden Aufgaben durch:

  • Er definiert eine Datensatzprozessorklasse,, mit MethodenStreamsRecordProcessor, die der Schnittstellendefinition entsprechen:, und. KCL initialize processRecords shutdown Die processRecords-Methode enthält die Logik, die zum Lesen von der Quelltabelle des Streams und zum Schreiben in die Zieltabelle erforderlich ist.

  • Sie definiert eine ClassFactory für die Datensatzprozessor-Klasse (StreamsRecordProcessorFactory). Dies ist für Java-Programme erforderlich, die das verwendenKCL.

  • Es instanziiert eine neue KCLWorker, die der Klassenfabrik zugeordnet ist.

  • Sie fährt Worker herunter, wenn die Datensatzverarbeitung abgeschlossen ist.

Weitere Informationen zur KCL Schnittstellendefinition finden Sie unter Developing Consumer using the Kinesis Client Library im Amazon Kinesis Data Streams Developer Guide.

Das folgende Codebeispiel zeigt die Hauptschleife in StreamsRecordProcessor. Die case-Anweisung bestimmt, welche Aktion basierend auf dem OperationType, der im Stream-Datensatz erscheint, durchgeführt werden soll.

for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }

Schritt 4: Sicherstellen, dass beide Tabellen über identische Inhalte verfügen

An diesem Punkt sind die Inhalte der Quell- und Zieltabellen synchronisiert. Die Anwendung gibt Scan-Anforderungen für beide Tabellen aus, um sicherzustellen, dass ihre Inhalte identisch sind.

Die DemoHelper Klasse enthält eine ScanTable Methode, die das Low-Level aufruft. Scan API Das Verfahren wird im folgenden Beispiel beschrieben.

if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }

Schritt 5: Bereinigen

Die Demonstration ist abgeschlossen, so dass die Anwendung Quell- und Zieltabellen löscht. Beachten Sie hierzu das folgende Codebeispiel. Nachdem die Tabellen gelöscht wurden, bleiben die Streams für bis zu 24 Stunden verfügbar. Anschließend werden sie automatisch gelöscht.

dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));