쿠키 기본 설정 선택

당사는 사이트와 서비스를 제공하는 데 필요한 필수 쿠키 및 유사한 도구를 사용합니다. 고객이 사이트를 어떻게 사용하는지 파악하고 개선할 수 있도록 성능 쿠키를 사용해 익명의 통계를 수집합니다. 필수 쿠키는 비활성화할 수 없지만 '사용자 지정' 또는 ‘거부’를 클릭하여 성능 쿠키를 거부할 수 있습니다.

사용자가 동의하는 경우 AWS와 승인된 제3자도 쿠키를 사용하여 유용한 사이트 기능을 제공하고, 사용자의 기본 설정을 기억하고, 관련 광고를 비롯한 관련 콘텐츠를 표시합니다. 필수가 아닌 모든 쿠키를 수락하거나 거부하려면 ‘수락’ 또는 ‘거부’를 클릭하세요. 더 자세한 내용을 선택하려면 ‘사용자 정의’를 클릭하세요.

연습: DynamoDB Streams Kinesis 어댑터

포커스 모드
연습: DynamoDB Streams Kinesis 어댑터 - Amazon DynamoDB

이번 단원에서는 Amazon Kinesis Client Library와 Amazon DynamoDB Streams Kinesis 어댑터를 사용하는 Java 애플리케이션에 대해 살펴보겠습니다. 이 애플리케이션은 한 테이블의 쓰기 작업이 두 번째 테이블에도 적용되면서 두 테이블의 내용이 동기화를 유지하는 데이터 복제의 예로 설명됩니다. 소스 코드는 전체 프로그램: DynamoDB Streams Kinesis 어댑터 섹션을 참조하세요.

이 프로그램에서는 다음 작업을 수행합니다.

  1. KCL-Demo-srcKCL-Demo-dst라는 이름의 DynamoDB 테이블 2개를 생성합니다. 두 테이블 모두 스트림이 활성화되어 있습니다.

  2. 항목을 추가, 업데이트 및 삭제하여 원본 테이블을 업데이트합니다. 이렇게 하면 데이터가 테이블의 스트림으로 기록됩니다.

  3. 스트림에서 레코드를 읽고 DynamoDB 요청으로 재작성한 다음 대상 테이블에 요청을 적용합니다.

  4. 원본 테이블과 대상 테이블을 스캔하여 내용이 동일한지 확인합니다.

  5. 두 테이블을 삭제합니다.

이러한 단계는 다음 단원에서 설명하며, 전체 애플리케이션은 연습 끝에 나와 있습니다.

1단계: DynamoDB 테이블 생성

첫 번째 단계에서 두 개의 DynamoDB 테이블(소스 테이블과 대상 테이블)을 생성합니다. 원본 테이블 스트림의 StreamViewTypeNEW_IMAGE입니다. 이 말은 원본 테이블 항목이 변경될 때마다 항목의 "사후" 이미지가 스트림에 기록된다는 것을 의미합니다. 이러한 방식으로 스트림이 테이블의 모든 쓰기 작업을 추적합니다.

다음은 두 테이블 생성에 사용된 코드를 보여주는 예제입니다.

java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

2단계: 소스 테이블의 업데이트 활동 생성

다음 단계는 원본 테이블의 쓰기 작업입니다. 이 작업을 하면 원본 테이블의 스트림 역시 거의 실시간으로 업데이트됩니다.

이 애플리케이션은 데이터 기록을 위해 PutItem, UpdateItemDeleteItem API 작업을 호출하는 메서드를 사용하여 헬퍼 클래스를 정의합니다. 다음은 이러한 메서드의 사용 방법을 나타낸 코드 예제입니다.

StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");

3단계: 스트림 처리

이제 프로그램이 스트림을 처리합니다. DynamoDB Streams Kinesis 어댑터가 KCL과 DynamoDB Streams 엔드포인트 사이에서 투명 계층의 역할을 하기 때문에 코드에서 하위 수준 DynamoDB Streams를 호출할 필요 없이 KCL을 최대한 이용할 수 있습니다. 프로그램이 실행하는 작업은 다음과 같습니다.

  • KCL 인터페이스 정의를 준수하는 메서드인 initialize, processRecordsshutdown을 사용하여 레코드 프로세서 클래스인 StreamsRecordProcessor를 정의합니다. processRecords 메서드에는 원본 테이블의 스트림에서 데이터를 읽어 대상 테이블에 기록하는 데 필요한 로직이 저장됩니다.

  • 레코드 프로세서 클래스의 클래스 팩토리(StreamsRecordProcessorFactory)를 정의합니다. Java 프로그램이 KCL을 사용하려면 이 팩토리가 필요합니다.

  • 새로운 KCL Worker를 인스턴스화하여 클래스 팩토리와 연동시킵니다.

  • 레코드 처리가 완료되면 Worker를 종료합니다.

KCL 인터페이스 정의에 대한 자세한 내용은 Amazon Kinesis Data Streams 개발자 안내서Kinesis Client Library를 사용하여 소비자 개발을 참조하세요.

다음은 StreamsRecordProcessor의 메인 루프를 나타낸 코드 예제입니다. case 문은 스트림 레코드에 표시되는 OperationType에 따라 어떤 작업을 실행할지 결정합니다.

for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }

4단계: 양 테이블에 동일한 콘텐츠가 있는지 확인

이 시점에서는 원본 테이블과 대상 테이블의 내용이 동기화 상태를 유지합니다. 애플리케이션이 두 테이블에 대해 Scan 요청을 하여 내용이 실제로 동일한지 확인합니다.

DemoHelper 클래스에는 하위 수준 Scan API를 호출하는 ScanTable 메서드가 포함되어 있습니다. 다음 예제는 이 작업을 수행하는 방법을 보여줍니다.

if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }

5단계: 정리

데모가 완료되면 애플리케이션이 원본 테이블과 대상 테이블을 삭제합니다. 다음 코드 예제를 참조하십시오. 하지만 테이블이 삭제된 후에도 스트림은 최대 24시간까지 사용 가능하며, 이 시간이 지나면 자동 삭제됩니다.

dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
프라이버시사이트 이용 약관쿠키 기본 설정
© 2025, Amazon Web Services, Inc. 또는 계열사. All rights reserved.