이번 단원에서는 Amazon Kinesis Client Library와 Amazon DynamoDB Streams Kinesis 어댑터를 사용하는 Java 애플리케이션에 대해 살펴보겠습니다. 이 애플리케이션은 한 테이블의 쓰기 작업이 두 번째 테이블에도 적용되면서 두 테이블의 내용이 동기화를 유지하는 데이터 복제의 예로 설명됩니다. 소스 코드는 전체 프로그램: DynamoDB Streams Kinesis 어댑터 섹션을 참조하세요.
이 프로그램에서는 다음 작업을 수행합니다.
-
KCL-Demo-src
와KCL-Demo-dst
라는 이름의 DynamoDB 테이블 2개를 생성합니다. 두 테이블 모두 스트림이 활성화되어 있습니다. -
항목을 추가, 업데이트 및 삭제하여 원본 테이블을 업데이트합니다. 이렇게 하면 데이터가 테이블의 스트림으로 기록됩니다.
-
스트림에서 레코드를 읽고 DynamoDB 요청으로 재작성한 다음 대상 테이블에 요청을 적용합니다.
-
원본 테이블과 대상 테이블을 스캔하여 내용이 동일한지 확인합니다.
-
두 테이블을 삭제합니다.
이러한 단계는 다음 단원에서 설명하며, 전체 애플리케이션은 연습 끝에 나와 있습니다.
주제
1단계: DynamoDB 테이블 생성
첫 번째 단계에서 두 개의 DynamoDB 테이블(소스 테이블과 대상 테이블)을 생성합니다. 원본 테이블 스트림의 StreamViewType
은 NEW_IMAGE
입니다. 이 말은 원본 테이블 항목이 변경될 때마다 항목의 "사후" 이미지가 스트림에 기록된다는 것을 의미합니다. 이러한 방식으로 스트림이 테이블의 모든 쓰기 작업을 추적합니다.
다음은 두 테이블 생성에 사용된 코드를 보여주는 예제입니다.
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));
java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
// key
ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
.withWriteCapacityUnits(2L);
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
.withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
.withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
2단계: 소스 테이블의 업데이트 활동 생성
다음 단계는 원본 테이블의 쓰기 작업입니다. 이 작업을 하면 원본 테이블의 스트림 역시 거의 실시간으로 업데이트됩니다.
이 애플리케이션은 데이터 기록을 위해 PutItem
, UpdateItem
및 DeleteItem
API 작업을 호출하는 메서드를 사용하여 헬퍼 클래스를 정의합니다. 다음은 이러한 메서드의 사용 방법을 나타낸 코드 예제입니다.
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
3단계: 스트림 처리
이제 프로그램이 스트림을 처리합니다. DynamoDB Streams Kinesis 어댑터가 KCL과 DynamoDB Streams 엔드포인트 사이에서 투명 계층의 역할을 하기 때문에 코드에서 하위 수준 DynamoDB Streams를 호출할 필요 없이 KCL을 최대한 이용할 수 있습니다. 프로그램이 실행하는 작업은 다음과 같습니다.
-
KCL 인터페이스 정의를 준수하는 메서드인
initialize
,processRecords
및shutdown
을 사용하여 레코드 프로세서 클래스인StreamsRecordProcessor
를 정의합니다.processRecords
메서드에는 원본 테이블의 스트림에서 데이터를 읽어 대상 테이블에 기록하는 데 필요한 로직이 저장됩니다. -
레코드 프로세서 클래스의 클래스 팩토리(
StreamsRecordProcessorFactory
)를 정의합니다. Java 프로그램이 KCL을 사용하려면 이 팩토리가 필요합니다. -
새로운 KCL
Worker
를 인스턴스화하여 클래스 팩토리와 연동시킵니다. -
레코드 처리가 완료되면
Worker
를 종료합니다.
KCL 인터페이스 정의에 대한 자세한 내용은 Amazon Kinesis Data Streams 개발자 안내서의 Kinesis Client Library를 사용하여 소비자 개발을 참조하세요.
다음은 StreamsRecordProcessor
의 메인 루프를 나타낸 코드 예제입니다. case
문은 스트림 레코드에 표시되는 OperationType
에 따라 어떤 작업을 실행할지 결정합니다.
for (Record record : records) {
String data = new String(record.getData().array(), Charset.forName("UTF-8"));
System.out.println(data);
if (record instanceof RecordAdapter) {
com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record)
.getInternalObject();
switch (streamRecord.getEventName()) {
case "INSERT":
case "MODIFY":
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
streamRecord.getDynamodb().getNewImage());
break;
case "REMOVE":
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
streamRecord.getDynamodb().getKeys().get("Id").getN());
}
}
checkpointCounter += 1;
if (checkpointCounter % 10 == 0) {
try {
checkpointer.checkpoint();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
4단계: 양 테이블에 동일한 콘텐츠가 있는지 확인
이 시점에서는 원본 테이블과 대상 테이블의 내용이 동기화 상태를 유지합니다. 애플리케이션이 두 테이블에 대해 Scan
요청을 하여 내용이 실제로 동일한지 확인합니다.
DemoHelper
클래스에는 하위 수준 Scan
API를 호출하는 ScanTable
메서드가 포함되어 있습니다. 다음 예제는 이 작업을 수행하는 방법을 보여줍니다.
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
.equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
System.out.println("Scan result is equal.");
}
else {
System.out.println("Tables are different!");
}
5단계: 정리
데모가 완료되면 애플리케이션이 원본 테이블과 대상 테이블을 삭제합니다. 다음 코드 예제를 참조하십시오. 하지만 테이블이 삭제된 후에도 스트림은 최대 24시간까지 사용 가능하며, 이 시간이 지나면 자동 삭제됩니다.
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));