Using Change Streams with Amazon DocumentDB - Amazon DocumentDB

Using Change Streams with Amazon DocumentDB

The change streams feature in Amazon DocumentDB (with MongoDB compatibility) provides a time-ordered sequence of change events that occur within your cluster’s collections. You can read events from a change stream to implement many different use cases, including the following:

  • Change notification

  • Full-text search with Amazon Elasticsearch Service (Amazon ES)

  • Analytics with Amazon Redshift

Applications can use change streams to subscribe to data changes on individual collections. Change streams events are ordered as they occur on the cluster and are stored for 3 hours (by default) after the event has been recorded.

Supported Operations

Amazon DocumentDB supports the following operations for change streams:

  • All change events supported in the MongoDB 3.6 db.collection.watch() API.

  • Full document lookup for updates.

  • Aggregation stages: $match, $project, $redact, and $addFields ($replaceRoot is not currently supported).

  • Resuming a change stream.

Billing

The Amazon DocumentDB change streams feature is disabled by default and does not incur any additional charges until the feature is enabled and used. You can use the modifyChangeStreams API operation to enable this feature for collection in the cluster. Using change streams in a cluster incurs additional IOPS and storage costs. For more information, see Amazon DocumentDB pricing.

Limitations

Change streams have the following limitations in Amazon DocumentDB:

  • Change streams can only be opened from a connection to the primary instance of an Amazon DocumentDB cluster. Reading from change streams on a replica instance is not currently supported. When invoking the watch() API operation, you must specify a primary read preference to ensure that all reads are directed to the primary instance (see the Example section).

  • Events written to a change stream for a collection are available for up to 24 hours (the default is 3 hours). Change streams data is deleted after the log retention duration window, even if no new changes have occurred.

  • A long-running write operation on a collection like updateMany or deleteMany can temporarily stall the writing of change streams events until the long running write operation is complete.

  • Amazon DocumentDB does not support the MongoDB operations log (oplog).

  • With Amazon DocumentDB, you must explicitly enable change streams on a given collection.

  • If the total size of a change streams event (including the change data and full document, if requested) is greater than 16 MB, the client will experience a read failure on the change streams.

Enabling Change Streams

You can enable Amazon DocumentDB change streams for all collections within a given database, or only for selected collections. The following are examples of how to enable change streams for different use cases using the mongo shell. Empty strings are treated as wildcards when specifying database and collection names.

//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});

Change streams will be enabled for a collection if any of the following are true:

  • Both the database and collection are explicitly enabled.

  • The database containing the collection is enabled.

  • All databases are enabled.

Dropping a collection from a database does not disable change streams for that collection if the parent database also has change streams enabled, or if all databases in the cluster are enabled. If a new collection is created with the same name as the deleted collection, change streams will be enabled for that collection.

You can list all of your cluster’s enabled change streams by using the $listChangeStreams aggregation pipeline stage. All aggregation stages supported by Amazon DocumentDB can be used in the pipeline for additional processing. If a previously enabled collection has been disabled, it will not appear in the $listChangeStreams output.

//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));

Example: Using Change Streams with Python

The following is an example of using an Amazon DocumentDB change stream with Python.

from pymongo import MongoClient, ReadPreference client = MongoClient("your-cluster-endpoint") db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event result = coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) stream.try_next() #Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} #A subsequent attempt to read the next change event returns nothing, as there are no new changes stream.try_next() #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) stream.try_next() #Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}}

Full Document Lookup

The update change event does not include the full document; it includes only the change that was made. If your use case requires the complete document affected by an update, you can enable full document lookup when opening the stream.

The fullDocument document for an update change streams event represents the most current version of the updated document at the time of document lookup. If changes occurred between the update operation and the fullDocument lookup, the fullDocument document might not represent the document state at update time.

#Create a stream object with update lookup enabled stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next() #Output: {'_id': {'_data': '015daf9b7c00000001010000000100009025'}, 'clusterTime': Timestamp(1571789692, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}

Resuming a Change Stream

You can resume a change stream later by using a resume token, which is equal to the _id field of the last retrieved change event document.

#Generate a new change event by updating a document result = coll.update_one({'x': 3}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] {'_data': '015daf9c5b00000001010000000100009025'} #Python provides a nice shortcut for getting a stream’s resume token stream.resume_token {'_data': '015daf9c5b00000001010000000100009025'} #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = coll.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id stream.try_next() #Output: {'_id': {'_data': '015daf9d1900000001010000000100009025'}, 'clusterTime': Timestamp(1571790105, 1), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 5}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 5}}} #Followed by the insert stream.try_next() #Output: {'_id': {'_data': '015daf9d8800000001010000000100009025'}, 'clusterTime': Timestamp(1571790216, 1), 'documentKey': {'_id': ObjectId('5daf9d88ea258751778163d9')}, 'fullDocument': {'_id': ObjectId('5daf9d88ea258751778163d9'), 'y': 5}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'}

Modifying the Change Stream Log Retention Duration

You can modify the change stream log retention duration to be between 1 hour and 24 hours using the AWS Management Console or the AWS CLI.

To modify the change stream log retention duration

  1. Sign in to the AWS Management Console, and open the Amazon DocumentDB console at https://console.aws.amazon.com/docdb.

  2. In the navigation pane, choose Parameter groups .

    Tip

    If you don't see the navigation pane on the left side of your screen, choose the menu icon () in the upper-left corner of the page.

  3. In the Parameter groups pane, choose the cluster parameter group that is associated with your cluster. To identify the cluster parameter group that is associated with your cluster, see Determining an Amazon DocumentDB Cluster's Parameter Group.

  4. The resulting page shows the parameters and their corresponding details for your cluster parameter group. Select the parameter change_stream_log_retention_duration.

  5. On the top right of the page, choose Edit to change the value of the parameter. The change_stream_log_retention_duration parameter can be modified to be between 1 and 24 hours.

  6. Make your change, and then choose Modify cluster parameter to save the changes. To discard your changes, choose Cancel.

To modify your cluster parameter group's change_stream_log_retention_duration parameter, use the modify-db-cluster-parameter-group operation with the following parameters:

  • --db-cluster-parameter-group-name — Required. The name of the cluster parameter group that you are modifying. To identify the cluster parameter group that is associated with your cluster, see Determining an Amazon DocumentDB Cluster's Parameter Group.

  • --parameters — Required. The parameter that you are modifying. Each parameter entry must include the following:

    • ParameterName — The name of the parameter that you are modifying. In this case, it is change_stream_log_retention_duration

    • ParameterValue — The new value for this parameter.

    • ApplyMethod — How you want changes to this parameter applied. Permitted values are immediate and pending-reboot.

      Note

      Parameters with the ApplyType of static must have an ApplyMethod of pending-reboot.

  1. To change the values of the parameter change_stream_log_retention_duration, run the following command and replace parameter-value with the value you want to modify the parameter to.

    For Linux, macOS, or Unix:

    aws docdb modify-db-cluster-parameter-group \ --db-cluster-parameter-group-name sample-parameter-group \ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    For Windows:

    aws docdb modify-db-cluster-parameter-group ^ --db-cluster-parameter-group-name sample-parameter-group ^ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    Output from this operation looks something like the following (JSON format).

    { "DBClusterParameterGroupName": "sample-parameter-group" }
  2. Wait at least 5 minutes.

  3. List the parameter values of sample-parameter-group to ensure that your changes have been made.

    For Linux, macOS, or Unix:

    aws docdb describe-db-cluster-parameters \ --db-cluster-parameter-group-name sample-parameter-group

    For Windows:

    aws docdb describe-db-cluster-parameters ^ --db-cluster-parameter-group-name sample-parameter-group

    Output from this operation looks something like the following (JSON format).

    { "Parameters": [ { "ParameterName": "audit_logs", "ParameterValue": "disabled", "Description": "Enables auditing on cluster.", "Source": "system", "ApplyType": "dynamic", "DataType": "string", "AllowedValues": "enabled,disabled", "IsModifiable": true, "ApplyMethod": "pending-reboot" }, { "ParameterName": "change_stream_log_retention_duration", "ParameterValue": "12345", "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.", "Source": "user", "ApplyType": "dynamic", "DataType": "integer", "AllowedValues": "3600-86400", "IsModifiable": true, "ApplyMethod": "immediate" } ] }