逐步解說:DynamoDB Streams Kinesis 轉接器 - Amazon DynamoDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

逐步解說:DynamoDB Streams Kinesis 轉接器

本節會逐步解說使用 Amazon Kinesis Client Library 與 Amazon DynamoDB Streams Kinesis 轉接器的 Java 應用程式。此應用程式示範資料複寫的範例,將一份資料表的寫入活動套用至第二份資料表,讓兩份資料表的內容保持同步。如需來源碼,請參閱「完整程式:DynamoDB Streams Kinesis 轉接器」。

此程式執行下列操作:

  1. 建立兩個 DynamoDB 資料表,並命名為 KCL-Demo-srcKCL-Demo-dst。這些資料表各會啟用串流。

  2. 在來源資料表中新增、更新與刪除項目,以產生更新活動。這會導致將資料寫入資料表的串流。

  3. 從串流讀取紀錄、將其重新建構為 DynamoDB 請求,再將請求套用至目標資料表。

  4. 掃描來源與目標資料表,以確定其內容相同。

  5. 刪除資料表以清除。

下列章節將說明這些步驟,並在演練結尾顯示完整的應用程式。

步驟 1:建立 DynamoDB 資料表

第一步是建立兩個 DynamoDB 資料表:一個來源資料表與一個目標資料表。來源資料表串流中的 StreamViewTypeNEW_IMAGE。這表示每當在此資料表中修改項目時,項目的「修改後」影像就會寫入串流。串流可透過此方法追蹤資料表上的所有寫入活動。

下列範例顯示用來建立這兩份資料表的程式碼。

java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

步驟 2:在來源資料表中產生更新活動

下一步是在來源資料表上產生一些寫入活動。在此活動進行期間,來源資料表的串流也會近乎即時地更新。

應用程式會使用呼叫 PutItemUpdateItemDeleteItemAPI操作以寫入資料的方法來定義協助程式類別。下列程式碼範例示範如何使用這些方法。

StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");

步驟 3:處理串流

程式現在會開始處理串流。DynamoDB Streams Kinesis Adapter 充當 KCL與 DynamoDB Streams 端點之間的透明層,因此程式碼可以完全使用,KCL而不必進行低階 DynamoDB Streams 呼叫。此程式會執行下列任務:

  • 它使用符合KCL介面定義StreamsRecordProcessor的方法來定義記錄處理器類別:initializeprocessRecordsshutdownprocessRecords 方法包含從來源資料表串流讀取與寫入目標資料表所需的邏輯。

  • 它會定義紀錄處理器類別的類別處理站 (StreamsRecordProcessorFactory)。使用 的 Java 程式需要此設定KCL。

  • 它會初始化與類別工廠Worker相關聯的新 KCL 。

  • 它會在紀錄處理完成時關閉 Worker

若要進一步了解KCL介面定義,請參閱 Amazon Kinesis Kinesis Data Streams 開發人員指南 中的使用 Kinesis 用戶端程式庫開發消費者

下列程式碼範例顯示 StreamsRecordProcessor 中的主迴圈。case 陳述式會根據串流紀錄中顯示的 OperationType 來決定要執行的動作。

for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }

步驟 4:確定這兩個資料表的內容相同

此時,來源與目標資料表的內容會同步。此應用程式會對這兩個資料表發出 Scan 請求,以確認其內容事實上相同。

DemoHelper 類別包含呼叫低階 Scan ScanTable的方法API。下列範例示範其使用方法。

if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }

步驟 5:清除

示範已完成,因此應用程式會刪除來源與目標資料表。請參閱以下程式碼範例。即使在刪除資料表後,其串流也會保持可用長達 24 小時,然後就自動刪除。

dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));