Amazon DocumentDB에서 변경 스트림 사용 - Amazon DocumentDB

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon DocumentDB에서 변경 스트림 사용

의 변경 스트림 기능은 클러스터의 컬렉션 내에서 발생하는 변경 이벤트 시퀀스를 시간 순서대로 제공합니다.Amazon DocumentDB(MongoDB와 호환) 변경 스트림에서 이벤트를 읽어 다음을 비롯한 다양한 사용 사례를 구현할 수 있습니다.

  • 변경 알림

  • Amazon Elasticsearch Service(Amazon ES)를 사용하여 전체 텍스트 검색

  • Amazon Redshift를 사용하여 분석

애플리케이션에서는 변경 스트림을 사용하여 개별 컬렉션의 데이터 변경 사항을 구독할 수 있습니다. 변경 스트림 이벤트는 클러스터에서 발생하는 순서대로 정렬되며 이벤트가 기록된 후 3시간(기본값) 동안 저장됩니다. 파라미터를 사용하여 보존 기간을 최대 7일까지 연장할 수 있습니다.change_stream_log_retention_duration 변경 스트림 보존 기간을 수정하려면 변경 스트림 로그 보존 기간 수정을 참조하십시오.

지원되는 작업

Amazon DocumentDB는 변경 스트림에 대해 다음 작업을 지원합니다.

  • MongoDB, db.collection.watch()db.watch() API에서 지원되는 모든 변경 이벤트.client.watch()

  • 업데이트를 위한 전체 문서 조회.

  • 집계 단계: $match , $project, $redact$addFields.$replaceRoot

  • 다시 시작 토큰에서 변경 스트림 재개

  • (startAtOperation v4.0+에 적용 가능)를 사용하여 타임스탬프에서 변경 스트림 재개Amazon DocumentDB

Billing

변경 스트림 기능은 기본적으로 비활성화되어 있으며 이 기능이 활성화될 때까지 추가 요금이 발생하지 않습니다.Amazon DocumentDB 클러스터에서 변경 스트림을 사용하면 추가 읽기 및 쓰기 IOs 및 스토리지 비용이 발생합니다. API 작업을 사용하여 클러스터에 대해 이 기능을 활성화할 수 있습니다.modifyChangeStreams 요금에 대한 자세한 내용은 Amazon DocumentDB 요금을 참조하십시오.

Limitations

Amazon DocumentDB에서 변경 스트림에는 다음과 같은 제한 사항이 있습니다.

  • 변경 스트림은 Amazon DocumentDB 클러스터의 기본 인스턴스에 대한 연결에서만 열 수 있습니다. 현재 복제본 인스턴스의 변경 스트림에서 읽기는 지원되지 않습니다. watch() API 작업을 호출할 때 모든 읽기가 기본 인스턴스에 대해 수행되도록 primary 읽기 기본 설정을 지정해야 합니다(예제 단원 참조).

  • 컬렉션의 변경 스트림에 기록된 이벤트는 최대 7일 동안 사용할 수 있습니다(기본값은 3시간). 변경 스트림 데이터는 새 변경 사항이 발생하지 않은 경우에도 로그 보존 기간 후에 삭제됩니다.

  • 모음에서 updateMany 또는 deleteMany 같은 장기 실행 쓰기 작업을 수행하는 경우, 장기 실행 쓰기 작업이 완료될 때까지 변경 스트림 이벤트의 쓰기를 일시적으로 중단할 수 있습니다.

  • Amazon DocumentDB는 MongoDB 작업 로그(oplog)를 지원하지 않습니다.

  • Amazon DocumentDB를 사용하면 지정된 컬렉션에서 변경 스트림을 명시적으로 활성화해야 합니다.

  • 변경 스트림 이벤트의 총 크기(변경 데이터 및 요청된 경우 전체 문서 포함)가 16 MB보다 크면 클라이언트가 변경 스트림에서 읽기에 실패합니다.

  • db.watch()client.watch() v3.6과 함께 사용하는 경우 현재 Ruby 드라이버가 지원되지 않습니다.Amazon DocumentDB

변경 스트림 활성화

해당 데이터베이스 내의 모든 모음 또는 선택한 모음에 대해 Amazon DocumentDB 변경 스트림을 활성화할 수 있습니다. 다음은 mongo 셸을 사용하여 다른 사용 사례에 변경 스트림을 활성화하는 방법의 예입니다. 데이터베이스 및 모음 이름을 지정할 때 빈 문자열은 와일드카드로 취급됩니다.

//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});

다음과 같은 경우 컬렉션에 대해 변경 스트림이 활성화됩니다.

  • 데이터베이스와 컬렉션 모두 명시적으로 활성화되어 있습니다.

  • 컬렉션을 포함하는 데이터베이스가 활성화되어 있습니다.

  • 모든 데이터베이스가 활성화되어 있습니다.

상위 데이터베이스에도 변경 스트림이 활성화되어 있거나 클러스터의 모든 데이터베이스가 활성화되어 있는 경우 데이터베이스에서 컬렉션을 삭제해도 해당 컬렉션에 대한 변경 스트림이 비활성화되지 않습니다. 삭제된 컬렉션과 동일한 이름으로 새 컬렉션이 생성되면 해당 컬렉션에 대해 변경 스트림이 활성화됩니다.

$listChangeStreams 집계 파이프라인 단계를 사용하여 클러스터의 활성화된 모든 변경 스트림을 나열할 수 있습니다. Amazon DocumentDB에서 지원하는 모든 집계 단계는 추가 처리를 위해 파이프라인에서 사용할 수 있습니다. 이전에 활성화된 컬렉션이 비활성화된 경우 $listChangeStreams 출력에 나타나지 않습니다.

//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));

예: Python에서 변경 스트림 사용

다음은 컬렉션 수준에서 Python과 함께 Amazon DocumentDB 변경 스트림을 사용하는 예제입니다.

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, ssl='true', ssl_ca_certs='rds-combined-ca-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """

다음은 데이터베이스 수준에서 Python과 함께 Amazon DocumentDB 변경 스트림을 사용하는 예입니다.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, ssl='true', ssl_ca_certs='rds-combined-ca-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """

전체 문서 조회

변경 사항 업데이트 이벤트에는 전체 문서가 포함되지 않으며 변경된 내용만 포함됩니다. 사용 사례에 업데이트의 영향을 받는 전체 문서가 필요한 경우, 스트림을 열 때 전체 문서 조회를 활성화할 수 있습니다.

변경 스트림 업데이트 이벤트에 대한 fullDocument 문서는 문서 조회 시 업데이트된 문서의 최신 버전을 나타냅니다. 업데이트 작업과 fullDocument 조회 사이에 변경 사항이 발생한 경우 fullDocument 문서가 업데이트 당시의 문서 상태를 나타내지 않을 수 있습니다.

#Create a stream object with update lookup enabled stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next() #Output: {'_id': {'_data': '015daf9b7c00000001010000000100009025'}, 'clusterTime': Timestamp(1571789692, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}

변경 스트림 재개

마지막으로 검색된 변경 이벤트 문서의 _id 필드와 동일한 다시 시작 토큰을 사용하여 나중에 변경 스트림을 다시 시작할 수 있습니다.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, ssl='true', ssl_ca_certs='rds-combined-ca-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """

를 사용하여 변경 스트림 다시 시작startAtOperationTime

나중에 startAtOperationTime를 사용하여 특정 타임스탬프에서 변경 스트림을 재개할 수 있습니다.

참고

를 사용할 수 있는 기능은 startAtOperationTime 4.0+에서 사용할 수 있습니다.Amazon DocumentDB 를 사용할 때 변경 스트림 커서는 지정된 타임스탬프 당일 또는 이후에 발생한 변경 사항만 반환합니다.startAtOperationTimestartAtOperationTime 명령은 상호 배타적이므로 함께 사용할 수 없습니다.resumeAfter

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, ssl='true', ssl_ca_certs='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """

변경 스트림의 트랜잭션

변경 스트림 이벤트에는 커밋되지 않았거나 중단된 트랜잭션의 이벤트가 포함되지 않습니다. 예를 들어, 하나의 INSERT 작업과 하나의 UPDATE 작업 및 으로 트랜잭션을 시작하는 경우. 작업이 성공하지만 INSERT 작업이 실패하면 트랜잭션이 롤백됩니다.UPDATE 이 트랜잭션을 롤백했기 때문에 변경 스트림에 이 트랜잭션에 대한 이벤트가 포함되지 않습니다.

변경 스트림 로그 보존 기간 수정

또는 AWS Management 콘솔를 사용하여 변경 스트림 로그 보존 기간을 1~7시간으로 수정할 수 있습니다.AWS CLI

변경 스트림 로그 보존 기간을 수정하려면

  1. AWS Management 콘솔에 로그인한 다음 https://console.aws.amazon.com/docdb에서 Amazon DocumentDB 콘솔을 엽니다.

  2. 탐색 창에서 파라미터 그룹을 선택합니다.

    작은 정보

    화면 왼쪽에 탐색 창이 표시되지 않으면 페이지 왼쪽 상단 모서리에서 메뉴 아이콘()을 선택하십시오.

  3. 파라미터 그룹 창에서 클러스터와 연결된 클러스터 파라미터 그룹을 선택합니다. 클러스터와 연결된 클러스터 파라미터 그룹을 식별하려면 Amazon DocumentDB 클러스터의 파라미터 그룹 확인 단원을 참조하십시오.

  4. 결과 페이지에는 이 클러스터 파라미터 그룹에 대한 파라미터 및 해당 세부 정보가 표시됩니다. change_stream_log_retention_duration 파라미터를 선택합니다.

  5. 페이지 오른쪽 상단에서 편집을 선택하여 파라미터의 값을 변경합니다. 파라미터는 1~7일 사이로 수정할 수 있습니다.change_stream_log_retention_duration

  6. 변경한 다음 Modify cluster parameter(클러스터 파라미터 수정)를 선택하여 변경 사항을 저장합니다. 변경 사항을 취소하려면 취소를 선택합니다.

클러스터 파라미터 그룹의 change_stream_log_retention_duration 파라미터를 수정하려면 다음 파라미터와 함께 modify-db-cluster-parameter-group 작업을 사용합니다.

  • --db-cluster-parameter-group-name — 필수. 수정하려는 클러스터 파라미터 그룹의 이름입니다. 클러스터와 연결된 클러스터 파라미터 그룹을 식별하려면 Amazon DocumentDB 클러스터의 파라미터 그룹 확인 단원을 참조하십시오.

  • --parameters — 필수. 수정 중인 파라미터입니다. 각 파라미터 요소는 다음을 포함해야 합니다.

    • ParameterName — 수정 중인 파라미터의 이름입니다. 이 경우에는 change_stream_log_retention_duration입니다.

    • ParameterValue — 이 파라미터의 새 값입니다.

    • ApplyMethod — 이 파라미터에 대한 변경 사항을 적용할 방법입니다. 허용된 값은 immediatepending-reboot입니다.

      참고

      ApplyType인 파라미터는 staticApplyMethod여야 합니다.pending-reboot

  1. 파라미터 change_stream_log_retention_duration의 값을 변경하려면 다음 명령을 실행하고 parameter-value를 파라미터를 수정할 값으로 바꿉니다.

    Linux, macOS 또는 Unix의 경우:

    aws docdb modify-db-cluster-parameter-group \ --db-cluster-parameter-group-name sample-parameter-group \ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    Windows의 경우:

    aws docdb modify-db-cluster-parameter-group ^ --db-cluster-parameter-group-name sample-parameter-group ^ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    이 작업의 출력은 다음과 같습니다(JSON 형식).

    { "DBClusterParameterGroupName": "sample-parameter-group" }
  2. 적어도 5분을 기다립니다.

  3. sample-parameter-group의 파라미터 값을 나열하여 변경 사항을 확인합니다.

    Linux, macOS 또는 Unix의 경우:

    aws docdb describe-db-cluster-parameters \ --db-cluster-parameter-group-name sample-parameter-group

    Windows의 경우:

    aws docdb describe-db-cluster-parameters ^ --db-cluster-parameter-group-name sample-parameter-group

    이 작업의 출력은 다음과 같습니다(JSON 형식).

    { "Parameters": [ { "ParameterName": "audit_logs", "ParameterValue": "disabled", "Description": "Enables auditing on cluster.", "Source": "system", "ApplyType": "dynamic", "DataType": "string", "AllowedValues": "enabled,disabled", "IsModifiable": true, "ApplyMethod": "pending-reboot" }, { "ParameterName": "change_stream_log_retention_duration", "ParameterValue": "12345", "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.", "Source": "user", "ApplyType": "dynamic", "DataType": "integer", "AllowedValues": "3600-86400", "IsModifiable": true, "ApplyMethod": "immediate" } ] }