아마존 DocumentDB를 통한 변경 스트림 사용 - Amazon DocumentDB

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

아마존 DocumentDB를 통한 변경 스트림 사용

Amazon DocumentDB의 변경 스트림 기능 (MongoDB와 호환 가능) 은 클러스터 컬렉션 내에서 발생하는 변경 이벤트를 시간순으로 정렬하여 제공합니다. 변경 스트림에서 이벤트를 읽어 다음을 비롯한 다양한 사용 사례를 구현할 수 있습니다.

  • 변경 알림

  • 아마존을 통한 전체 텍스트 검색 OpenSearch 서비스 (OpenSearch 서비스)

  • 아마존 레드시프트를 사용한 분석

애플리케이션에서는 변경 스트림을 사용하여 개별 컬렉션의 데이터 변경 사항을 구독할 수 있습니다. 변경 스트림 이벤트는 클러스터에서 발생하는 순서대로 정렬되며 이벤트가 기록된 후 3시간(기본값) 동안 저장됩니다. 를 사용하여 보존 기간을 최대 7일까지 연장할 수 있습니다.change_stream_log_retention_duration파라미터. 변경 스트림 보존 기간을 수정하려면 을 참조하십시오.변경 스트림 로그 보존 기간 수정.

지원되는 작업

Amazon DocumentDB는 변경 스트림에 대해 다음과 같은 작업을 지원합니다.

  • MongoDB에서 지원되는 모든 변경 이벤트는db.collection.watch(),db.watch()client.watch()아피.

  • 업데이트를 위한 전체 문서 조회.

  • 집계 단계:$match,$project,$redact, 그리고$addFields$replaceRoot.

  • 이력서 토큰에서 변경 스트림 재개

  • 를 사용하여 타임스탬프에서 변경 스트림 재개startAtOperation(아마존 도큐먼트DB v4.0 이상에 적용 가능)

결제

Amazon DocumentDB 변경 스트림 기능은 기본적으로 비활성화되어 있으며 이 기능이 활성화되기 전까지는 추가 요금이 발생하지 않습니다. 클러스터에서 변경 스트림을 사용하면 읽기 및 쓰기 IO와 스토리지 비용이 추가로 발생합니다. 다음을 사용할 수 있습니다.modifyChangeStreamsAPI 작업을 통해 클러스터에서 이 기능을 활성화할 수 있습니다. 요금에 대한 자세한 내용은 을 참조하십시오.아마존 도큐먼트DB 요금.

제한 사항

Amazon DocumentDB에는 변경 스트림에 다음과 같은 제한이 있습니다.

  • 변경 스트림은 Amazon DocumentDB 클러스터의 기본 인스턴스에 대한 연결에서만 열 수 있습니다. 현재 복제본 인스턴스의 변경 스트림에서 읽기는 지원되지 않습니다. watch() API 작업을 호출할 때 모든 읽기가 기본 인스턴스에 대해 수행되도록 primary 읽기 기본 설정을 지정해야 합니다(예제 단원 참조).

  • 컬렉션의 변경 스트림에 기록된 이벤트는 최대 7일 동안 사용할 수 있습니다 (기본값은 3시간). 변경 스트림 데이터는 새 변경 사항이 발생하지 않은 경우에도 로그 보존 기간 후에 삭제됩니다.

  • 모음에서 updateMany 또는 deleteMany 같은 장기 실행 쓰기 작업을 수행하는 경우, 장기 실행 쓰기 작업이 완료될 때까지 변경 스트림 이벤트의 쓰기를 일시적으로 중단할 수 있습니다.

  • 아마존 DocumentDB는 MongoDB 작업 로그를 지원하지 않습니다 (oplog).

  • Amazon DocumentDB를 사용하면 지정된 컬렉션에서 변경 스트림을 명시적으로 활성화해야 합니다.

  • 변경 스트림 이벤트의 총 크기(변경 데이터 및 요청된 경우 전체 문서 포함)가 16 MB보다 크면 클라이언트가 변경 스트림에서 읽기에 실패합니다.

  • Ruby 드라이버는 현재 사용 시 지원되지 않습니다.db.watch()client.watch()아마존 도큐먼트DB v3.6을 사용합니다.

변경 스트림 활성화

특정 데이터베이스 내의 모든 컬렉션 또는 선택한 컬렉션에 대해서만 Amazon DocumentDB 변경 스트림을 활성화할 수 있습니다. 다음은 mongo 셸을 사용하여 다른 사용 사례에 변경 스트림을 활성화하는 방법의 예입니다. 데이터베이스 및 모음 이름을 지정할 때 빈 문자열은 와일드카드로 취급됩니다.

//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});

다음과 같은 경우 컬렉션에 대해 변경 스트림이 활성화됩니다.

  • 데이터베이스와 컬렉션 모두 명시적으로 활성화되어 있습니다.

  • 컬렉션을 포함하는 데이터베이스가 활성화되어 있습니다.

  • 모든 데이터베이스가 활성화되어 있습니다.

상위 데이터베이스에도 변경 스트림이 활성화되어 있거나 클러스터의 모든 데이터베이스가 활성화되어 있는 경우 데이터베이스에서 컬렉션을 삭제해도 해당 컬렉션에 대한 변경 스트림이 비활성화되지 않습니다. 삭제된 컬렉션과 동일한 이름으로 새 컬렉션이 생성되면 해당 컬렉션에 대해 변경 스트림이 활성화됩니다.

$listChangeStreams 집계 파이프라인 단계를 사용하여 클러스터의 활성화된 모든 변경 스트림을 나열할 수 있습니다. Amazon DocumentDB에서 지원하는 모든 집계 단계를 파이프라인에서 추가 처리를 위해 사용할 수 있습니다. 이전에 활성화된 컬렉션이 비활성화된 경우 $listChangeStreams 출력에 나타나지 않습니다.

//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));

예: Python에서 변경 스트림 사용

다음은 Amazon DocumentDB 변경 스트림을 Python과 함께 컬렉션 수준에서 사용하는 예입니다.

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """

다음은 데이터베이스 수준에서 Python과 함께 Amazon DocumentDB 변경 스트림을 사용하는 예입니다.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """

전체 문서 조회

변경 사항 업데이트 이벤트에는 전체 문서가 포함되지 않으며 변경된 내용만 포함됩니다. 사용 사례에 업데이트의 영향을 받는 전체 문서가 필요한 경우, 스트림을 열 때 전체 문서 조회를 활성화할 수 있습니다.

변경 스트림 업데이트 이벤트에 대한 fullDocument 문서는 문서 조회 시 업데이트된 문서의 최신 버전을 나타냅니다. 업데이트 작업과 fullDocument 조회 사이에 변경 사항이 발생한 경우 fullDocument 문서가 업데이트 당시의 문서 상태를 나타내지 않을 수 있습니다.

#Create a stream object with update lookup enabled stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next() #Output: {'_id': {'_data': '015daf9b7c00000001010000000100009025'}, 'clusterTime': Timestamp(1571789692, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}

변경 스트림 재개

마지막으로 검색된 변경 이벤트 문서의 _id 필드와 동일한 다시 시작 토큰을 사용하여 나중에 변경 스트림을 다시 시작할 수 있습니다.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """

다음을 통해 변경 스트림 재개startAtOperationTime

다음을 사용하여 특정 타임스탬프부터 나중에 변경 스트림을 재개할 수 있습니다.startAtOperationTime.

참고

다음을 사용할 수 있습니다.startAtOperationTime아마존 도큐먼트DB 4.0 이상에서 사용할 수 있습니다. 사용하는 경우startAtOperationTime변경 스트림 커서는 지정된 타임스탬프 시점 또는 이후에 발생한 변경 사항만 반환합니다. The startAtOperationTimeresumeAfter명령은 상호 배타적이므로 함께 사용할 수 없습니다.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """

변경 스트림의 트랜잭션

변경 스트림 이벤트에는 커밋되지 않았거나 중단된 트랜잭션의 이벤트는 포함되지 않습니다. 예를 들어, 하나의 트랜잭션으로 트랜잭션을 시작하는 경우INSERT오퍼레이션과 하나UPDATE수술 및. 만약INSERT수술이 성공하지만UPDATE작업이 실패하면 트랜잭션이 롤백됩니다. 이 트랜잭션이 롤백되었으므로 변경 스트림에는 이 트랜잭션에 대한 이벤트가 포함되지 않습니다.

변경 스트림 로그 보존 기간 수정

를 사용하여 변경 스트림 로그 보존 기간을 1시간에서 7일 사이로 수정할 수 있습니다.AWS Management Console또는AWS CLI.

Using the AWS Management Console
변경 스트림 로그 보존 기간을 수정하려면
  1. 에 로그인AWS Management Console다음 위치에서 Amazon DocumentDB 콘솔을 엽니다.https://console.aws.amazon.com/docdb.

  2. 탐색 창에서 파라미터 그룹을 선택합니다.

    작은 정보

    화면 왼쪽에 탐색 창이 표시되지 않으면 페이지 왼쪽 상단 모서리에서 메뉴 아이콘()을 선택하십시오.

  3. 파라미터 그룹 창에서 클러스터와 연결된 클러스터 파라미터 그룹을 선택합니다. 클러스터와 연결된 클러스터 파라미터 그룹을 식별하려면 아마존 DocumentDB 클러스터의 파라미터 그룹 결정 단원을 참조하십시오.

  4. 결과 페이지에는 이 클러스터 파라미터 그룹에 대한 파라미터 및 해당 세부 정보가 표시됩니다. change_stream_log_retention_duration 파라미터를 선택합니다.

  5. 페이지 오른쪽 상단에서 편집을 선택하여 파라미터의 값을 변경합니다. The change_stream_log_retention_duration파라미터는 1시간에서 7일 사이로 수정할 수 있습니다.

  6. 변경한 다음 Modify cluster parameter(클러스터 파라미터 수정)를 선택하여 변경 사항을 저장합니다. 변경 사항을 취소하려면 취소를 선택합니다.

Using the AWS CLI

클러스터 파라미터 그룹의 change_stream_log_retention_duration 파라미터를 수정하려면 다음 파라미터와 함께 modify-db-cluster-parameter-group 작업을 사용합니다.

  • --db-cluster-parameter-group-name - 필수입니다. 수정하려는 클러스터 파라미터 그룹의 이름입니다. 클러스터와 연결된 클러스터 파라미터 그룹을 식별하려면 아마존 DocumentDB 클러스터의 파라미터 그룹 결정 단원을 참조하십시오.

  • --parameters - 필수입니다. 수정 중인 파라미터입니다. 각 파라미터 요소는 다음을 포함해야 합니다.

    • ParameterName— 수정 중인 매개 변수의 이름. 이 경우에는 change_stream_log_retention_duration입니다.

    • ParameterValue— 이 매개 변수의 새 값입니다.

    • ApplyMethod— 이 매개 변수에 변경 내용을 적용하려는 방법 허용된 값은 immediatepending-reboot입니다.

      참고

      staticApplyType 파라미터에는 pending-rebootApplyMethod이 있어야 합니다.

  1. 파라미터 change_stream_log_retention_duration의 값을 변경하려면 다음 명령을 실행하고 parameter-value를 파라미터를 수정할 값으로 바꿉니다.

    Linux, macOS 또는 Unix의 경우:

    aws docdb modify-db-cluster-parameter-group \ --db-cluster-parameter-group-name sample-parameter-group \ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    Windows의 경우:

    aws docdb modify-db-cluster-parameter-group ^ --db-cluster-parameter-group-name sample-parameter-group ^ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    이 작업의 출력은 다음과 같습니다(JSON 형식).

    { "DBClusterParameterGroupName": "sample-parameter-group" }
  2. 적어도 5분을 기다립니다.

  3. sample-parameter-group의 파라미터 값을 나열하여 변경 사항을 확인합니다.

    Linux, macOS 또는 Unix의 경우:

    aws docdb describe-db-cluster-parameters \ --db-cluster-parameter-group-name sample-parameter-group

    Windows의 경우:

    aws docdb describe-db-cluster-parameters ^ --db-cluster-parameter-group-name sample-parameter-group

    이 작업의 출력은 다음과 같습니다(JSON 형식).

    { "Parameters": [ { "ParameterName": "audit_logs", "ParameterValue": "disabled", "Description": "Enables auditing on cluster.", "Source": "system", "ApplyType": "dynamic", "DataType": "string", "AllowedValues": "enabled,disabled", "IsModifiable": true, "ApplyMethod": "pending-reboot" }, { "ParameterName": "change_stream_log_retention_duration", "ParameterValue": "12345", "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.", "Source": "user", "ApplyType": "dynamic", "DataType": "integer", "AllowedValues": "3600-86400", "IsModifiable": true, "ApplyMethod": "immediate" } ] }
참고

스트림 로그 보존을 변경해도 구성된 것보다 오래된 로그는 삭제되지 않습니다.change_stream_log_retention_duration로그 크기가 (>) 51,200MB보다 커질 때까지의 값입니다.