チュートリアル: DynamoDB Streams Kinesis Adapter
このセクションは、Amazon Kinesis Client Library と Amazon DynamoDB Streams Kinesis Adapter を使用する Java アプリケーションのチュートリアルです。アプリケーションには、データレプリケーションの例が表示されます。データレプリケーションでは、1 つのテーブルからの書き込みアクティビティが 2 番目のテーブルに適用され、両方のテーブルの内容が同期されます。ソースコードについては、「完成したプログラム: DynamoDB Streams Kinesis Adapter」を参照してください。
このプログラムでは、次のような処理を実行します。
-
KCL-Demo-src
とKCL-Demo-dst
という 2 つの DynamoDB テーブルを作成します。これらの各テーブルでは、ストリームが有効になっています。 -
項目を追加、更新、削除することで、ソーステーブルで更新アクティビティを生成します。これにより、データがテーブルのストリームに書き込まれます。
-
ストリーミングからレコードを読み込んで、DynamoDB リクエストとして再構築し、ターゲットテーブルにリクエストを適用します。
-
ソーステーブルとターゲットテーブルをスキャンし、内容が同じであることを確認します。
-
テーブルを削除してクリーンアップします。
これらのステップについては次のセクションで説明します。完成したアプリケーションは、チュートリアルの最後に示します。
トピック
ステップ 1: DynamoDB テーブルを作成する
最初のステップでは、2 つの DynamoDB テーブル (送信元テーブルと送信先テーブル) を作成します。ソーステーブルのストリームにある StreamViewType
は NEW_IMAGE
です。これは、このテーブルで項目が変更されると必ず、イメージの "後の" 項目がストリームに書き込まれることを意味します。このようにして、ストリームはテーブル内のすべての書き込みアクティビティを記録します。
次の例は、両方のテーブルを作成するためのコードを示しています。
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
ステップ 2: ソーステーブルに更新アクティビティを生成する
次のステップでは、ソーステーブルにいくつかの書き込みアクティビティを生成します。このアクティビティの実行中、ソーステーブルのストリームもほぼリアルタイムで更新されます。
アプリケーションは、データを書き込むための PutItem
、UpdateItem
、および DeleteItem
API オペレーションを呼び出すメソッドを持つヘルパークラスを定義します。次の例は、これらのメソッドの使用方法を示しています。
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
ステップ 3: ストリームを処理する
ここでは、プログラムがストリームの処理を開始します。DynamoDB Streams Kinesis Adapter は、低レベルの DynamoDB Streams コールを行わなくてもコードが KCL を十分に使用できるように、KCL と DynamoDB Streams エンドポイントの間の透過的なレイヤーとして機能します。このプログラムでは次のタスクを実行しています。
-
KCL インターフェイス定義に従ったメソッド(
StreamsRecordProcessor
、initialize
、processRecords
)を使用して、レコードプロセッサクラスshutdown
を定義します。processRecords
メソッドには、ソーステーブルのストリームからの読み込みとターゲットテーブルへの書き込みに必要なロジックが含まれています。 -
レコードプロセッサクラスのクラスファクトリを定義します(
StreamsRecordProcessorFactory
)。これは、KCL を使用する Java プログラムに必要です。 -
クラスファクトリに関連付けられた新しい KCL
Worker
をインスタンス化します。 -
レコード処理が完了すると、
Worker
をシャットダウンします。
KCL インターフェイス定義の詳細については、「Amazon Kinesis Data Streams デベロッパーガイド」の「Kinesis Client Library を使用したコンシューマーの開発」を参照してください。
次の例は、StreamsRecordProcessor
におけるメインループを示しています。case
ステートメントは、ストリームレコードに出現する OperationType
に基づいて、実行するアクションを決定します。
for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
ステップ 4: 両方のテーブルの内容が同じであることを確認する
この時点で、ソーステーブルとターゲットテーブルの内容が同期されています。アプリケーションは、両方のテーブルに対して Scan
リクエストを発行し、内容が実際に同じであることを確認します。
DemoHelper
クラスには、低レベルの ScanTable
API を呼び出す Scan
メソッドが含まれています。次の例は、その使用方法を示しています。
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }
ステップ 5:クリーンアップ
デモは完了したため、アプリケーションによりソーステーブルとターゲットテーブルが削除されます。次のコード例を参照してください。テーブルが削除されても、そのストリームは最大 24 時間使用可能です。その後、自動的に削除されます。
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));