Amazon DocumentDB での変更ストリームの使用 - Amazon DocumentDB

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon DocumentDB での変更ストリームの使用

Amazon DocumentDB (MongoDB 互換) の変更ストリーム機能は、クラスターのコレクション内で発生する変更イベントの時系列シーケンスを提供します。変更ストリームからイベントを読み取って、次のようなさまざまなユースケースを実装できます。

  • 変更通知

  • Amazon OpenSearch Service (OpenSearch Service) による全文検索

  • Amazon Redshift を用いてのイベントの分析

アプリケーションは変更ストリームを使用して、個々のコレクションでデータ変更をサブスクライブすることができます。変更ストリームイベントは、クラスターで発生する際に順序付けされ、イベントが記録されてから 3 時間 (デフォルト) 保存されます。change_stream_log_retention_duration のパラメータを使って、保存期間を7日まで延長できます。変更ストリーム保持期間を変更するには、Modifying the Change Stream Log Retention Duration を参照してください。

サポートされているオペレーション

Amazon DocumentDB では、変更ストリームに対して次の操作がサポートされています。

  • MongoDB 、db.collection.watch()db.watch()および でサポートされているすべての変更イベントclient.watch()API。

  • 更新のための完全ドキュメント参照。

  • 集約ステージ:$match$project$redact、および $addFields そして $replaceRoot

  • 再開トークンからの変更ストリームの再開

  • を使用してタイムスタンプから変更ストリームを再開する startAtOperation (Amazon DocumentDB 4.0 以降に適用)

「請求」

Amazon DocumentDB 変更ストリーム機能はデフォルトで無効になっており、この機能が有効化されるまで追加料金は発生しません。クラスターで変更ストリームを使用すると、追加の読み取り/書き込みIOsおよびストレージコストが発生します。modifyChangeStreams API オペレーションを使用して、クラスターでこの機能を有効にできます。料金のさらなる詳細については、Amazon DocumentDB の料金 を参照してください。

制限事項

変更ストリームには、Amazon DocumentDB で以下の制限があります。

  • Amazon DocumentDB 3.6 および Amazon DocumentDB 4.0 では、変更ストリームは Amazon DocumentDB クラスターのプライマリインスタンスへの接続からのみ開くことができます。レプリカインスタンスの変更ストリームからの読み取りは、Amazon DocumentDB 3.6 および Amazon DocumentDB 4.0 ではサポートされていません。watch() API オペレーションを呼び出すときは、primary読み取り設定を指定して、すべての読み取りがプライマリインスタンスに転送されるようにする必要があります (セクションを参照)。

  • Amazon DocumentDB 5.0 では、グローバルクラスターを含むプライマリインスタンスとセカンダリインスタンスの両方から変更ストリームを開くことができます。セカンダリ読み取り設定を指定して、変更ストリームをセカンダリインスタンスにリダイレクトできます。その他のベストプラクティスと制限については、セカンダリインスタンスでの変更ストリームの使用「」を参照してください。

  • コレクションの変更ストリームに書き込まれるイベントは、最大 7 日間 (デフォルトは 3 時間) 使用できます。変更ストリームのデータは、新しい変更が行われていない場合でも、ログの保持期間が終了すると削除されます。

  • updateManydeleteMany のようなコレクションで長時間実行されている書き込みオペレーションは、長時間実行されている書き込みオペレーションが完了するまで、変更ストリームイベントの書き込みを一時的に停止する可能性があります。

  • Amazon DocumentDB は、MongoDB オペレーションログ (oplog) をサポートしていません。

  • Amazon DocumentDB を用いて、特定のコレクションで変更ストリームを明示的に有効にする必要があります。

  • 変更ストリームイベントの合計サイズ (要求された場合、変更データとドキュメント全体を含む) が 16 MB より大きい場合、変更ストリームでの読み取りエラーがクライアントで発生します。

  • Amazon DocumentDB 3.6 client.watch()db.watch()および を使用する場合、Ruby ドライバーは現在サポートされていません。

変更ストリームの有効化

Amazon DocumentDB 変更ストリームは、特定のデータベース内のすべてのコレクションに対して有効にすることも、選択したコレクションに対してのみ有効にすることもできます。以下は、mongo シェルを使用してさまざまなユースケースの変更ストリームを有効にする方法の例です。データベース名とコレクション名を指定する場合、空の文字列はワイルドカードとして扱われます。

//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});

次のいずれかに該当する場合、変更ストリームはコレクションに対して有効になります。

  • データベースとコレクションの両方が明示的に有効になっている。

  • コレクションを含むデータベースが有効になっている。

  • すべてのデータベースが有効になっている。

親データベースでも変更ストリームが有効になっている場合、またはクラスター内のすべてのデータベースが有効になっている場合は、データベースからコレクションを削除しても、そのコレクションの変更ストリームは無効になりません。削除されたコレクションと同じ名前で新しいコレクションが作成された場合、そのコレクションに対して変更ストリームが有効になります。

$listChangeStreams 集約パイプラインステージを使用すると、クラスターで有効なすべての変更ストリームを一覧表示できます。Amazon DocumentDB でサポートされているすべての集約ステージは、パイプラインで追加の処理に使用できます。以前に有効にしたコレクションが無効になっている場合、そのコレクションは $listChangeStreams 出力に表示されません。

//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));

例: Python で変更ストリームを使用する

コレクションレベルで、 Python を用いて Amazon DocumentDB 変更ストリームを使用する例を次に示します。

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """

データベースレベルで、 Python を用いて Amazon DocumentDB 変更ストリームを使用する例を次に示します。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """

完全なドキュメント検索

変更の更新イベントには、ドキュメント全体は含まれません。加えられた変更のみが含まれます。ユースケースで、更新の影響を受ける完全なドキュメントが必要な場合は、ストリームを開くときに完全なドキュメント参照を有効にできます。

変更の更新ストリームイベントの fullDocument ドキュメントは、ドキュメント参照時に更新されたドキュメントの最新バージョンを表します。更新オペレーションと fullDocument ルックアップの間で変更が発生した場合、fullDocument ドキュメントが更新時にドキュメントの状態を表していない可能性があります。

更新ルックアップを有効にしてストリームオブジェクトを作成するには、次の例を使用します。

stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next()

ストリームオブジェクトの出力は次のようになります。

{'_id': {'_data': '015daf9b7c00000001010000000100009025'}, 'clusterTime': Timestamp(1571789692, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}

変更ストリームの再開

再開トークンを使用すると、変更ストリームを後で再開できます。このトークンは、最後に取得した変更イベントドキュメントの _id フィールドと同じです。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """

で変更ストリームを再開する startAtOperationTime

startAtOperationTime を使って、特定のタイムスタンプから後で変更ストリームを再開できます。

注記

startAtOperationTime を使用する能力は、Amazon DocumentDB 4.0+ で入手可能です。startAtOperationTime を使用する場合、変更ストリームカーソルは、指定された Timestamp 以降で発生した変更のみを返します。startAtOperationTimeresumeAfter のコマンドは相互に排他的であるため、一緒に使用することはできません。

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """

変更ストリーム内のトランザクション

変更ストリームイベントには、コミットされていないトランザクションや中止されたトランザクションのイベントは含まれません。例えば、トランザクションを 1 つの INSERT のオペレーションと 1 つの UPDATE のオペレーション、およびで開始する場合、もし INSERT のオペレーションが成功しても UPDATE の操作が失敗すると、トランザクションがロールバックされます。このトランザクションはロールバックされたため、変更ストリームにはこのトランザクションのイベントは含まれません。

変更ストリームログの保持期間の変更

AWS Management Console または を使用して、変更ストリームログの保持期間を 1 時間から 7 日の間で変更できます AWS CLI。

Using the AWS Management Console
変更ストリームログの保持期間を変更するには
  1. にサインインし AWS Management Console、https://console.aws.amazon.com/docdb で Amazon DocumentDB コンソールを開きます。

  2. ナビゲーションペインで、[パラメータグループ] を選択します。

    ヒント

    画面の左側にナビゲーションペインが表示されない場合は、ページの左上隅にあるメニューアイコン () を選択します。

  3. [パラメータグループ] ペインで、クラスターに関連付けられたクラスターパラメータグループを選択します。クラスターに関連付けられているクラスターパラメータグループを調べるには、「Amazon DocumentDB クラスターのパラメータグループの確認をする」を参照してください。

  4. 結果のページには、クラスターパラメータグループのパラメータと対応する詳細が表示されます。change_stream_log_retention_duration パラメータを選択します。

  5. ページの右上にある [編集] を選択して、パラメータの値を変更します。change_stream_log_retention_duration のパラメータは、1 時間から 7 日の間で変更できます。

  6. 変更を行い、[クラスターのパラメータを変更] を選択して変更を保存します。変更を破棄するには、[キャンセル] を選択します。

Using the AWS CLI

クラスターパラメータグループの change_stream_log_retention_duration パラメータを変更するには、以下のパラメータを指定して modify-db-cluster-parameter-group オペレーションを使用します。

  • --db-cluster-parameter-group-name — 必須。変更するクラスターパラメータグループの名前。クラスターに関連付けられているクラスターパラメータグループを調べるには、「Amazon DocumentDB クラスターのパラメータグループの確認をする」を参照してください。

  • --parameters — 必須。変更するパラメータ。各パラメータの入力には以下を含める必要があります。

    • ParameterName - 変更しているパラメータの名前。この場合は change_stream_log_retention_duration

    • ParameterValue - このパラメータの新しい値。

    • ApplyMethod - このパラメータの変更を適用する方法です。有効な値は、immediate および pending-reboot です。

      注記

      ApplyTypestatic であるパラメータでは、ApplyMethodpending-reboot である必要があります。

  1. パラメータ change_stream_log_retention_duration の値を変更するには、次のコマンドを実行し、parameter-value をパラメータの変更後の値に置き換えます。

    Linux、macOS、Unix の場合:

    aws docdb modify-db-cluster-parameter-group \ --db-cluster-parameter-group-name sample-parameter-group \ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    Windows の場合:

    aws docdb modify-db-cluster-parameter-group ^ --db-cluster-parameter-group-name sample-parameter-group ^ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    このオペレーションからの出力は次のようになります (JSON 形式)。

    { "DBClusterParameterGroupName": "sample-parameter-group" }
  2. 少なくとも 5 分お待ち下さい。

  3. sample-parameter-group のパラメータ値を一覧表示し、変更が行われたことを確認します。

    Linux、macOS、Unix の場合:

    aws docdb describe-db-cluster-parameters \ --db-cluster-parameter-group-name sample-parameter-group

    Windows の場合:

    aws docdb describe-db-cluster-parameters ^ --db-cluster-parameter-group-name sample-parameter-group

    このオペレーションからの出力は次のようになります (JSON 形式)。

    { "Parameters": [ { "ParameterName": "audit_logs", "ParameterValue": "disabled", "Description": "Enables auditing on cluster.", "Source": "system", "ApplyType": "dynamic", "DataType": "string", "AllowedValues": "enabled,disabled", "IsModifiable": true, "ApplyMethod": "pending-reboot" }, { "ParameterName": "change_stream_log_retention_duration", "ParameterValue": "12345", "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.", "Source": "user", "ApplyType": "dynamic", "DataType": "integer", "AllowedValues": "3600-86400", "IsModifiable": true, "ApplyMethod": "immediate" } ] }
注記

ストリームログの保存期間を変更しても、ログサイズが 51,200 MB より大きくなるまで (>)、設定した change_stream_log_retention_duration の値より古いログは削除されません。

セカンダリインスタンスでの変更ストリームの使用

セカンダリインスタンスで変更ストリームの使用を開始するには、変更ストリームカーソルをセカンダリreadPreferenceとして開きます。

変更ストリームカーソルを開いて、特定のコレクションまたはクラスターまたはデータベース内のすべてのコレクションの変更イベントを監視できます。任意の Amazon DocumentDB インスタンスで変更ストリームカーソルを開き、ライターインスタンスとリーダーインスタンスの両方から変更ストリームドキュメントを取得できます。変更ストリームトークン ( resumeTokenや などstartOperationTime) は、ライターインスタンスとリーダーインスタンスで開かれたさまざまな変更ストリームカーソル間で共有できます。

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Your password> clusterendpoint = "DocumentDBClusterEndpoint" client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] # Make sure to use SECONDARY to redirect cursor reads from secondary instances coll = db.get_collection('foo', read_preference=ReadPreference.SECONDARY) # Create a stream object on RO. The token needs to generated from PRIMARY. stream = coll.watch(resumeAfter=token) for event in stream: print(event)

セカンダリインスタンスの変更ストリームのガイドラインと制限

  • 変更ストリームイベントは、プライマリインスタンスからセカンダリインスタンスにレプリケートする必要があります。Amazon の DBInstanceReplicaLagメトリクスから遅延をモニタリングできます CloudWatch。

  • セカンダリインスタンスのタイムスタンプは、プライマリインスタンスと常に同期しているとは限りません。この場合、セカンダリインスタンスのタイムスタンプに遅延が発生し、追いつくことが予想されます。ベストプラクティスとして、セカンダリインスタンスでウォッチを起動resumeTokenするには、 startAtOperationTimeまたは を使用することをお勧めします。

  • ドキュメントサイズが大きく、 を実行しており、プライマリインスタンスで同時書き込みワークロードが高い場合、プライマリインスタンスと比較してfullDocumentLookupセカンダリインスタンスのスループットが低下する可能性があります。ベストプラクティスとして、セカンダリのバッファキャッシュヒット率をモニタリングし、バッファキャッシュヒット率が高いことを確認することをお勧めします。