Amazon DocumentDB 및 Java를 사용한 이벤트 기반 프로그래밍 - Amazon DocumentDB

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon DocumentDB 및 Java를 사용한 이벤트 기반 프로그래밍

Amazon DocumentDB의 맥락에서 이벤트 기반 프로그래밍은 데이터베이스 변경이 후속 비즈니스 로직 및 프로세스를 트리거하는 기본 이벤트 생성기 역할을 하는 강력한 아키텍처 패턴을 나타냅니다. DocumentDB 컬렉션에 레코드가 삽입, 업데이트 또는 삭제되면 이러한 변경 사항은 다양한 다운스트림 프로세스, 알림 또는 데이터 동기화 작업을 자동으로 시작하는 이벤트 역할을 합니다. 이 패턴은 여러 애플리케이션 또는 서비스가 데이터 변경에 실시간으로 대응해야 하는 최신 분산 시스템에서 특히 유용합니다. DocumentDB에서 이벤트 기반 프로그래밍을 구현하는 주요 메커니즘은 변경 스트림입니다.

참고

이 안내서에서는 작업 중인 컬렉션에서 변경 스트림을 활성화했다고 가정합니다. 컬렉션에서 변경 스트림을 활성화하는 방법은 Amazon DocumentDB에서 변경 스트림 사용 섹션을 참조하세요.

Java 애플리케이션에서 변경 스트림 작업

MongoDB의 Java 드라이버에 있는 watch() 메서드는 Amazon DocumentDB의 실시간 데이터 변경 사항을 모니터링하는 기본 메커니즘입니다. watch() 메서드는 MongoClient, MongoDatabaseMongoCollection 객체에 의해에서 호출될 수 있습니다.

watch() 메서ChangeStreamIterable드는 업데이트에 대한 전체 문서 조회, 신뢰성을 위한 재개 토큰 및 타임스탬프 제공, 변경 사항 필터링을 위한 파이프라인 집계 단계 등 다양한 구성 옵션을 지원하는의 인스턴스를 반환합니다.

ChangeStreamIterable는 코어 Java 인터페이스를 구현Iterable하며와 함께 사용할 수 있습니다forEach(). 를 사용하여 이벤트를 캡처하려면 변경된 이벤트를 forEach() 처리하는에 콜백 함수를 forEach()전달합니다. 다음 코드 조각은 컬렉션에서 변경 스트림을 열어 변경 이벤트 모니터링을 시작하는 방법을 보여줍니다.

ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });

모든 변경 이벤트를 통과하는 또 다른 방법은 클러스터에 대한 연결을 유지하고 새 변경 이벤트가 발생할 때 지속적으로 수신하는 커서를 여는 것입니다. 변경 스트림 커서를 가져오려면 ChangeStreamIterable 객체의 cursor() 메서드를 사용합니다. 다음 코드 예제에서는 커서를 사용하여 변경 이벤트를 모니터링하는 방법을 보여줍니다.

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }

가장 좋은 방법은 try-with-resource 문MongoChangeStreamCursor에서를 생성하거나 커서를 수동으로 닫는 것입니다. 에서 cursor() 메서드를 호출하면 ChangeStreamDocument 객체를 통해 생성된 MongoChangeStreamCursorChangeStreamIterable 반환됩니다.

ChangeStreamDocument 클래스는 스트림의 개별 변경 이벤트를 나타내는 중요한 구성 요소입니다. 여기에는 작업 유형(삽입, 업데이트, 삭제, 교체), 문서 키, 네임스페이스 정보, 사용 가능한 경우 전체 문서 콘텐츠를 포함하여 각 수정에 대한 자세한 정보가 포함되어 있습니다. 클래스는 변경 유형을 getOperationType() 결정하고, 전체 문서 상태에 액세스하고, 수정된 문서를 getDocumentKey() 식별하는 등 변경 이벤트의 다양한 측면에 getFullDocument() 액세스하는 방법을 제공합니다.

ChangeStreamDocument 객체는 두 가지 중요한 정보인 재개 토큰과 변경 이벤트 시간을 제공합니다.

DocumentDB 변경 스트림에서 토큰 재개 및 시간 기반 작업은 연속성을 유지하고 과거 데이터 액세스를 관리하기 위한 중요한 메커니즘을 제공합니다. 재개 토큰은 각 변경 이벤트에 대해 생성된 고유 식별자로, 애플리케이션이 연결 해제 또는 실패 후 특정 지점에서 변경 스트림 처리를 다시 시작할 수 있도록 허용하는 북마크 역할을 합니다. 변경 스트림 커서가 생성되면 resumeAfter() 옵션을 통해 이전에 저장된 재개 토큰을 사용하여 처음부터 시작하거나 이벤트를 손실하는 대신 스트림이 중단된 위치에서 계속될 수 있습니다.

변경 스트림의 시간 기반 작업은 변경 이벤트 모니터링의 시작점을 관리하기 위한 다양한 접근 방식을 제공합니다. startAtOperationTime() 옵션을 사용하면 특정 타임스탬프에서 또는 그 이후에 발생한 변경 사항을 볼 수 있습니다. 이러한 시간 기반 기능은 과거 데이터 처리, point-in-time으로 복구 또는 시스템 간 동기화가 필요한 시나리오에서 특히 유용합니다.

다음 코드 예제에서는 삽입 문서와 연결된 이벤트를 검색하고 재개 토큰을 캡처한 다음 해당 토큰을 제공하여 삽입 이벤트 후 이벤트 모니터링을 시작합니다. 이벤트는 업데이트 이벤트와 연결되어 업데이트가 발생한 클러스터 시간을 가져오고 해당 타임스탬프를 추가 처리를 위한 시작점으로 사용합니다.

BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }

기본적으로 업데이트 변경 이벤트에는 전체 문서가 포함되지 않으며 수행된 변경 사항만 포함됩니다. 업데이트된 전체 문서에 액세스해야 하는 경우 ChangeStreamIterable 객체에서 fullDocument() 메서드를 호출할 수 있습니다. 업데이트 이벤트에 대해 전체 문서를 반환하도록 요청하면 스트림 변경 호출이 이루어진 시점에 존재하는 문서가 반환된다는 점에 유의하세요.

이 메서드는 열거형FullDocument을 파라미터로 사용합니다. 현재 Amazon DocumentDB는 DEFAULT 및 UPDATE_LOOKUP 값만 지원합니다. 다음 코드 조각은 변경 사항을 감시하기 시작할 때 업데이트 이벤트에 대한 전체 문서를 요청하는 방법을 보여줍니다.

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())