Amazon DocumentDB에서 변경 스트림 사용
Amazon DocumentDB(MongoDB 호환)의 변경 스트림 기능은 클러스터의 컬렉션 내에서 시간순으로 발생하는 변경 이벤트 시퀀스를 제공합니다. 변경 스트림에서 이벤트를 읽어 다음을 비롯한 다양한 사용 사례를 구현할 수 있습니다.
-
변경 알림
-
Amazon OpenSearch Service(OpenSearch Service)로 전체 텍스트 검색
-
Amazon Redshift로 분석
애플리케이션에서는 변경 스트림을 사용하여 개별 컬렉션의 데이터 변경 사항을 구독할 수 있습니다. 변경 스트림 이벤트는 클러스터에서 발생하는 순서대로 정렬되며 이벤트가 기록된 후 3시간(기본값) 동안 저장됩니다. 파라미터를 사용하여 보존 기간을 최대 7일까지 연장할 수 있습니다. change_stream_log_retention_duration
변경 스트림 보존 기간을 수정하려면 변경 스트림 로그 보존 기간 수정을 참조하세요.
주제
지원되는 연산자
Amazon DocumentDB는 변경 스트림에 대해 다음 작업을 지원합니다.
-
MongoDB
db.collection.watch()
,db.watch()
및client.watch()
API에서 지원되는 모든 변경 이벤트. -
업데이트를 위한 전체 문서 조회.
-
집계 단계:
$match
$project
,$redact
, 및$addFields
와$replaceRoot
. -
이력서 토큰에서 변경 스트림 재개
-
startAtOperation
항목을 사용하여 타임스탬프에서 변경 스트림 재개(Amazon DocumentDB 4.0+에 해당)
결제
Amazon DocumentDB 변경 스트림 기능은 기본적으로 비활성화되어 있으며 이 기능이 활성화될 때까지 추가 요금이 발생하지 않습니다. 클러스터에서 변경 스트림을 사용하면 추가 읽기 및 쓰기 IO와 스토리지 비용이 발생합니다. modifyChangeStreams
API 작업을 사용하여 클러스터에 대해 이 기능을 활성화할 수 있습니다. 요금에 대한 자세한 내용은 Amazon DocumentDB 요금
제한 사항
Amazon DocumentDB에서 변경 스트림에는 다음과 같은 제한 사항이 있습니다.
-
Amazon DocumentDB 3.6. 및 Amazon DocumentDB 4.0에서 변경 스트림은 Amazon DocumentDB 클러스터의 기본 인스턴스에 대한 연결에서만 열 수 있습니다. Amazon DocumentDB 3.6. 및 Amazon DocumentDB 4.0에서는 복제본 인스턴스의 변경 스트림에서 읽기 기능이 지원되지 않습니다.
watch()
API 작업을 호출할 때 모든 읽기가 기본 인스턴스에 대해 수행되도록primary
읽기 기본 설정을 지정해야 합니다(예제 섹션 참조). -
Amazon DocumentDB 5.0에서는 전역 클러스터를 포함하여 기본 인스턴스와 보조 인스턴스 모두에서 변경 스트림을 열 수 있습니다. 보조 읽기 기본 설정을 지정하여 변경 스트림을 보조 인스턴스로 리디렉션할 수 있습니다. 추가 모범 사례 및 제한 사항은 보조 인스턴스에서 변경 스트림 사용 항목을 참조하세요.
-
모음의 변경 스트림에 작성된 이벤트는 최대 7일 동안 사용할 수 있습니다(기본값은 3시간). 변경 스트림 데이터는 새 변경 사항이 발생하지 않은 경우에도 로그 보존 기간 후에 삭제됩니다.
-
모음에서
updateMany
또는deleteMany
같은 장기 실행 쓰기 작업을 수행하는 경우, 장기 실행 쓰기 작업이 완료될 때까지 변경 스트림 이벤트의 쓰기를 일시적으로 중단할 수 있습니다. -
Amazon DocumentDB는 MongoDB 작업 로그(
oplog
)를 지원하지 않습니다. -
Amazon DocumentDB를 사용하면 지정된 컬렉션에서 변경 스트림을 명시적으로 활성화해야 합니다.
-
변경 스트림 이벤트의 총 크기(변경 데이터 및 요청된 경우 전체 문서 포함)가
16 MB
보다 크면 클라이언트가 변경 스트림에서 읽기에 실패합니다. -
Amazon DocumentDB v3.6과
client.watch()
및db.watch()
항목을 사용할 때는 현재 Ruby 드라이버가 지원되지 않습니다. -
필드의 업데이트된 값이 이전 값과 동일한 경우 변경 스트림의
updateDescription
명령 출력은 Amazon DocumentDB와 MongoDB에서 다릅니다.제공된 필드가
$set
명령에 지정되어 있고 대상 값이 이미 소스 값과 동일한 경우 Amazon DocumentDB는updateDescription
출력의 필드를 반환하지 않습니다.MongoDB는 지정된 값이 현재 값과 같더라도 출력의 필드를 반환합니다.
변경 스트림 활성화
해당 데이터베이스 내의 모든 모음 또는 선택한 모음에 대해 Amazon DocumentDB 변경 스트림을 활성화할 수 있습니다. 다음은 mongo 쉘을 사용하여 다른 사용 사례에 변경 스트림을 활성화하는 방법의 예입니다. 데이터베이스 및 모음 이름을 지정할 때 빈 문자열은 와일드카드로 취급됩니다.
//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});
다음과 같은 경우 컬렉션에 대해 변경 스트림이 활성화됩니다.
-
데이터베이스와 컬렉션 모두 명시적으로 활성화되어 있습니다.
-
컬렉션을 포함하는 데이터베이스가 활성화되어 있습니다.
-
모든 데이터베이스가 활성화되어 있습니다.
상위 데이터베이스에도 변경 스트림이 활성화되어 있거나 클러스터의 모든 데이터베이스가 활성화되어 있는 경우 데이터베이스에서 컬렉션을 삭제해도 해당 컬렉션에 대한 변경 스트림이 비활성화되지 않습니다. 삭제된 컬렉션과 동일한 이름으로 새 컬렉션이 생성되면 해당 컬렉션에 대해 변경 스트림이 활성화됩니다.
$listChangeStreams
집계 파이프라인 단계를 사용하여 클러스터의 활성화된 모든 변경 스트림을 나열할 수 있습니다. Amazon DocumentDB에서 지원하는 모든 집계 단계는 추가 처리를 위해 파이프라인에서 사용할 수 있습니다. 이전에 활성화된 컬렉션이 비활성화된 경우 $listChangeStreams
출력에 나타나지 않습니다.
//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));
예: Python에서 변경 스트림 사용
다음은 컬렉션 수준의 Python에서 Amazon DocumentDB 변경 스트림을 사용하는 예입니다.
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """
다음은 데이터베이스 수준의 Python에서 Amazon DocumentDB 변경 스트림을 사용하는 예입니다.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """
전체 문서 조회
변경 사항 업데이트 이벤트에는 전체 문서가 포함되지 않으며 변경된 내용만 포함됩니다. 사용 사례에 업데이트의 영향을 받는 전체 문서가 필요한 경우, 스트림을 열 때 전체 문서 조회를 활성화할 수 있습니다.
변경 스트림 업데이트 이벤트에 대한 fullDocument
문서는 문서 조회 시 업데이트된 문서의 최신 버전을 나타냅니다. 업데이트 작업과 fullDocument
조회 사이에 변경 사항이 발생한 경우 fullDocument
문서가 업데이트 당시의 문서 상태를 나타내지 않을 수 있습니다.
업데이트 조회가 활성화된 스트림 객체를 생성하려면 다음 예제를 사용하세요.
stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()
스트림 객체의 결과는 다음과 비슷합니다.
{'_id': {'_data': '015daf9b7c00000001010000000100009025'},
'clusterTime': Timestamp(1571789692, 1),
'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')},
'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3},
'ns': {'coll': 'foo', 'db': 'bar'},
'operationType': 'update',
'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}
변경 스트림 재개
마지막으로 검색된 변경 이벤트 문서의 _id
필드와 동일한 다시 시작 토큰을 사용하여 나중에 변경 스트림을 다시 시작할 수 있습니다.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """
startAtOperationTime
에서 변경 스트림 재개
startAtOperationTime
을 사용하여 특정 타임스탬프에서 나중에 변경 스트림을 재개할 수 있습니다.
참고
startAtOperationTime
을 사용하는 기능은 Amazon DocumentDB 4.0 이상에서 사용할 수 있습니다. startAtOperationTime
을 사용하는 경우, 변경 스트림 커서는 지정된 타임스탬프 시점 또는 이후에 발생한 변경 사항만 반환합니다. startAtOperationTime
및 resumeAfter
명령은 상호 배타적이므로 함께 사용할 수 없습니다.
import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """
변경 스트림 내 트랜잭션
변경 스트림 이벤트에는 커밋되지 않았거나 중단된 트랜잭션의 이벤트는 포함되지 않습니다. 예를 들어, 하나의 INSERT
작업과 하나의 UPDATE
작업으로 트랜잭션을 시작하는 경우 그리고. INSERT
작업은 성공했지만 UPDATE
작업이 실패하는 경우 트랜잭션은 롤백됩니다. 이 트랜잭션이 롤백되었으므로 변경 스트림에는 이 트랜잭션에 대한 이벤트가 포함되지 않습니다.
변경 스트림 로그 보존 기간 수정
AWS Management Console 또는 AWS CLI를 사용하여 변경 스트림 로그 보존 기간을 1시간~7일 사이로 수정할 수 있습니다.
참고
변경 스트림 로그 보존은 로그 크기가 (>) 51,200MB보다 커질 때까지 구성된 change_stream_log_retention_duration
값보다 오래된 로그를 삭제하지 않습니다.
보조 인스턴스에서 변경 스트림 사용
보조 인스턴스에서 변경 스트림 사용을 시작하려면 readPreference
항목을 보조 인스턴스로 사용하여 변경 스트림 커서를 엽니다.
변경 스트림 커서를 열어 클러스터 또는 데이터베이스의 특정 컬렉션 또는 모든 컬렉션에 대한 변경 이벤트를 감시할 수 있습니다. Amazon DocumentDB 인스턴스에서 변경 스트림 커서를 열고 라이터 인스턴스와 리더 인스턴스 모두에서 변경 스트림 문서를 가져올 수 있습니다. 라이터 및 리더 인스턴스에서 열린 다양한 변경 스트림 커서에서 변경 스트림 토큰(예: resumeToken
또는 startOperationTime
)을 공유할 수 있습니다.
예
import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)
보조 인스턴스의 변경 스트림에 대한 지침 및 제한 사항
스트림 변경 이벤트는 기본 인스턴스에서 보조 인스턴스로 복제해야 합니다. Amazon CloudWatch를 사용하여
DBInstanceReplicaLag
지표를 모니터링할 수도 있습니다.보조 인스턴스의 타임스탬프가 항상 기본 인스턴스와 동기화되지는 않을 수 있습니다. 이 경우 보조 인스턴스 타임스탬프가 지연될 수 있어 따라잡을 수 있습니다. 모범 사례로
startAtOperationTime
또는resumeToken
를 사용하여 보조 인스턴스에서 감시를 시작하는 것이 좋습니다.문서 크기가 크고,
fullDocumentLookup
작업을 수행 중이며, 기본 인스턴스에 동시 쓰기 워크로드가 많은 경우 기본 인스턴스에 비해 보조 인스턴스의 처리량이 낮은 상황을 경험할 수도 있습니다. 가장 좋은 방법은 보조 인스턴스에서 버퍼 캐시 적중률을 모니터링하고 버퍼 캐시 적중률이 높은지 확인하는 것입니다.