Menggunakan Aliran Perubahan dengan Amazon DocumentDB - Amazon DocumentDB

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan Aliran Perubahan dengan Amazon DocumentDB

Fitur aliran perubahan di Amazon DocumentDB (dengan kompatibilitas MongoDB) menyediakan urutan waktu peristiwa perubahan yang terjadi dalam koleksi klaster Anda. Anda dapat membaca peristiwa dari aliran perubahan untuk menerapkan banyak kasus penggunaan yang berbeda, termasuk yang berikut:

  • Notifikasi perubahan

  • Pencarian teks lengkap dengan Amazon OpenSearch Layanan (OpenSearch Layanan)

  • Analitik dengan Amazon Redshift

Aplikasi dapat menggunakan aliran perubahan untuk berlangganan perubahan data pada koleksi individu. Peristiwa aliran perubahan diurutkan saat terjadi di klaster dan disimpan selama 3 jam (secara default) setelah peristiwa dicatat. Periode retensi dapat diperpanjang hingga 7 hari menggunakan parameter change_stream_log_retention_duration. Untuk mengubah periode retensi aliran perubahan, lihat Memodifikasi Durasi Retensi Log Aliran Perubahan.

Operasi yang Didukung

Amazon DocumentDB mendukung operasi berikut untuk aliran perubahan:

  • Semua peristiwa perubahan didukung di API db.collection.watch(), db.watch() dan client.watch() MongoDB.

  • Pencarian dokumen lengkap untuk pembaruan.

  • Tahap agregasi: $match, $project, $redact, serta $addFields dan $replaceRoot.

  • Melanjutkan aliran perubahan dari token resume

  • Melanjutkan aliran perubahan dari stempel waktu menggunakan startAtOperation (berlaku untuk Amazon DocumentDB v4.0+)

Penagihan

Fitur aliran perubahan Amazon DocumentDB dinonaktifkan secara default dan tidak dikenakan biaya tambahan apa pun hingga fitur tersebut diaktifkan. Menggunakan aliran perubahan dalam klaster menimbulkan tambahan IO baca dan tulis serta tambahan biaya penyimpanan. Anda dapat menggunakan operasi API modifyChangeStreams untuk mengaktifkan fitur ini untuk klaster Anda. Untuk informasi selengkapnya tentang penentuan harga, lihat Penentuan harga Amazon DocumentDB.

Keterbatasan:

Aliran perubahan memiliki batasan berikut di Amazon DocumentDB:

  • Aliran perubahan hanya dapat dibuka dari koneksi ke instans primer dari klaster Amazon DocumentDB. Membaca dari aliran perubahan pada instans replika saat ini tidak didukung. Ketika memanggil operasi API watch(), Anda harus menentukan preferensi baca primary untuk memastikan bahwa semua pembacaan diarahkan ke instans primer (lihat bagian Contoh).

  • Peristiwa yang ditulis ke aliran perubahan untuk koleksi tersedia hingga 7 hari (defaultnya adalah 3 jam). Data aliran perubahan dihapus setelah jendela durasi retensi log, meskipun tidak ada perubahan baru yang terjadi.

  • Operasi tulis yang berjalan lama pada koleksi seperti updateMany atau deleteMany dapat menghentikan sementara penulisan peristiwa aliran perubahan hingga operasi tulis yang berjalan lama selesai.

  • Amazon DocumentDB tidak mendukung log operasi MongoDB (oplog).

  • Dengan Amazon DocumentDB, Anda harus secara eksplisit mengaktifkan aliran perubahan pada koleksi tertentu.

  • Jika ukuran total peristiwa aliran perubahan (termasuk data perubahan dan dokumen lengkap, jika diminta) lebih besar dari 16 MB, klien akan mengalami kegagalan baca pada aliran perubahan.

  • Driver Ruby saat ini tidak didukung ketika menggunakan db.watch() dan client.watch() dengan Amazon DocumentDB v3.6.

Mengaktifkan Aliran Perubahan

Anda dapat mengaktifkan aliran perubahan Amazon DocumentDB untuk semua koleksi dalam basis data tertentu, atau hanya untuk koleksi yang dipilih. Berikut ini adalah contoh cara mengaktifkan aliran perubahan untuk kasus penggunaan yang berbeda menggunakan shell mongo. String kosong diperlakukan sebagai wildcard ketika menentukan nama basis data dan koleksi.

//Enable change streams for the collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: true});
//Disable change streams on collection "foo" in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "foo", enable: false});
//Enable change streams for all collections in database "bar" db.adminCommand({modifyChangeStreams: 1, database: "bar", collection: "", enable: true});
//Enable change streams for all collections in all databases in a cluster db.adminCommand({modifyChangeStreams: 1, database: "", collection: "", enable: true});

Aliran perubahan akan diaktifkan untuk koleksi jika apa pun dari berikut ini benar:

  • Baik basis data dan koleksi diaktifkan secara eksplisit.

  • Basis data yang berisi koleksi diaktifkan.

  • Semua basis data diaktifkan.

Menjatuhkan koleksi dari basis data tidak menonaktifkan aliran perubahan untuk koleksi tersebut jika basis data induk juga mengaktifkan aliran perubahan, atau jika semua basis data di klaster diaktifkan. Jika koleksi baru dibuat dengan nama yang sama dengan koleksi yang dihapus, aliran perubahan akan diaktifkan untuk koleksi tersebut.

Anda dapat membuat daftar semua aliran perubahan yang diaktifkan klaster Anda dengan menggunakan tahap alur agregasi $listChangeStreams. Semua tahapan agregasi yang didukung oleh Amazon DocumentDB dapat digunakan dalam alur untuk pemrosesan tambahan. Jika koleksi yang sebelumnya diaktifkan telah dinonaktifkan, koleksi tersebut tidak akan muncul di output $listChangeStreams.

//List all databases and collections with change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}], cursor:{}}));
//List of all databases and collections with change streams enabled { "database" : "test", "collection" : "foo" } { "database" : "bar", "collection" : "" } { "database" : "", "collection" : "" }
//Determine if the database “bar” or collection “bar.foo” have change streams enabled cursor = new DBCommandCursor(db, db.runCommand( {aggregate: 1, pipeline: [{$listChangeStreams: 1}, {$match: {$or: [{database: "bar", collection: "foo"}, {database: "bar", collection: ""}, {database: "", collection: ""}]}} ], cursor:{}}));

Contoh: Menggunakan Aliran Perubahan dengan Python

Berikut ini adalah contoh penggunaan aliran perubahan Amazon DocumentDB dengan Python di tingkat koleksi.

import os import sys from pymongo import MongoClient, ReadPreference username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #While ‘Primary’ is the default read preference, here we give an example of #how to specify the required read preference when reading the change streams coll = db.get_collection('foo', read_preference=ReadPreference.PRIMARY) #Create a stream object stream = coll.watch() #Write a new document to the collection to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ #Generate a new change event by updating a document result = coll.update_one({'x': 1}, {'$set': {'x': 2}}) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf99d400000001010000000100009025'}, 'clusterTime': Timestamp(1571789268, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 2}}} """

Berikut ini adalah contoh penggunaan aliran perubahan Amazon DocumentDB dengan Python di tingkat basis data.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem') db = client['bar'] #Create a stream object stream = db.watch() coll = db.get_collection('foo') #Write a new document to the collection foo to generate a change event coll.insert_one({'x': 1}) #Read the next change event from the stream (if any) print(stream.try_next()) """ Expected Output: {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'insert'} """ #A subsequent attempt to read the next change event returns nothing, as there are no new changes print(stream.try_next()) """ Expected Output: None """ coll = db.get_collection('foo1') #Write a new document to another collection to generate a change event coll.insert_one({'x': 1}) print(stream.try_next()) """ Expected Output: Since the change stream cursor was the database level you can see change events from different collections in the same database {'_id': {'_data': '015daf94f600000002010000000200009025'}, 'clusterTime': Timestamp(1571788022, 2), 'documentKey': {'_id': ObjectId('5daf94f6ea258751778163d6')}, 'fullDocument': {'_id': ObjectId('5daf94f6ea258751778163d6'), 'x': 1}, 'ns': {'coll': 'foo1', 'db': 'bar'}, 'operationType': 'insert'} """

Pencarian Dokumen Lengkap

Peristiwa perubahan pembaruan tidak menyertakan dokumen lengkap; itu mencakup hanya perubahan yang telah dibuat. Jika kasus penggunaan Anda memerlukan dokumen lengkap yang terpengaruh oleh pembaruan, Anda dapat mengaktifkan pencarian dokumen lengkap saat membuka aliran.

Dokumen fullDocument untuk peristiwa aliran perubahan pembaruan mewakili versi terbaru dari dokumen yang diperbarui pada saat pencarian dokumen. Jika terjadi perubahan antara operasi pembaruan dan pencarian fullDocument, dokumen fullDocument mungkin tidak mewakili status dokumen pada waktu pembaruan.

#Create a stream object with update lookup enabled stream = coll.watch(full_document='updateLookup') #Generate a new change event by updating a document result = coll.update_one({'x': 2}, {'$set': {'x': 3}}) stream.try_next() #Output: {'_id': {'_data': '015daf9b7c00000001010000000100009025'}, 'clusterTime': Timestamp(1571789692, 1), 'documentKey': {'_id': ObjectId('5daf9502ea258751778163d7')}, 'fullDocument': {'_id': ObjectId('5daf9502ea258751778163d7'), 'x': 3}, 'ns': {'coll': 'foo', 'db': 'bar'}, 'operationType': 'update', 'updateDescription': {'removedFields': [], 'updatedFields': {'x': 3}}}

Melanjutkan Aliran Perubahan

Anda dapat melanjutkan aliran perubahan nanti dengan menggunakan token resume, yang sama dengan bidang _id dari dokumen peristiwa perubahan yang terakhir diambil.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='global-bundle.pem', retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() token = event['_id'] print(token) """ Output: This is the resume token that we will later us to resume the change stream {'_data': '015daf9c5b00000001010000000100009025'} """ #Python provides a nice shortcut for getting a stream’s resume token print(stream.resume_token) """ Output {'_data': '015daf9c5b00000001010000000100009025'} """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) #Generate another change event by inserting a document result = coll.insert_one({'y': 5}) #Open a stream starting after the selected resume token stream = db.watch(full_document='updateLookup', resume_after=token) #Our first change event is the update with the specified _id print(stream.try_next()) """ #Output: Since we are resuming the change stream from the resume token, we will see all events after the first update operation. In our case, the change stream will resume from the update operation {x:5} {'_id': {'_data': '015f7e8f0c000000060100000006000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602129676, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2')}, 'fullDocument': {'_id': ObjectId('5f7e8f0ac423bafbfd9adba2'), 'x': 5}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ #Followed by the insert print(stream.try_next()) """ #Output: {'_id': {'_data': '015f7e8f0c000000070100000007000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602129676, 7), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94')}, 'fullDocument': {'_id': ObjectId('5f7e8f0cbf8c233ed577eb94'), 'y': 5}} """

Melanjutkan Aliran Perubahan dengan startAtOperationTime

Anda dapat melanjutkan aliran perubahan nanti dari stempel waktu tertentu dengan menggunakan startAtOperationTime.

catatan

Kemampuan untuk menggunakan startAtOperationTime tersedia di Amazon DocumentDB 4.0+. Saat menggunakan startAtOperationTime, kursor aliran perubahan hanya akan mengembalikan perubahan yang terjadi pada atau setelah Stempel Waktu yang ditentukan. Perintah startAtOperationTime dan resumeAfter saling eksklusif dan karenanya tidak dapat digunakan bersama.

import os import sys from pymongo import MongoClient username = "DocumentDBusername" password = <Insert your password> clusterendpoint = "DocumentDBClusterEndpoint” client = MongoClient(clusterendpoint, username=username, password=password, tls='true', tlsCAFile='rds-root-ca-2020.pem',retryWrites='false') db = client['bar'] coll = db.get_collection('foo') #Create a stream object stream = db.watch() coll.update_one({'x': 1}, {'$set': {'x': 4}}) event = stream.try_next() timestamp = event['clusterTime'] print(timestamp) """ Output Timestamp(1602129114, 4) """ #Generate a new change event by updating a document result = coll.update_one({'x': 4}, {'$set': {'x': 5}}) result = coll.insert_one({'y': 5}) #Generate another change event by inserting a document #Open a stream starting after specified time stamp stream = db.watch(start_at_operation_time=timestamp) print(stream.try_next()) """ #Output: Since we are resuming the change stream at the time stamp of our first update operation (x:4), the change stream cursor will point to that event {'_id': {'_data': '015f7e941a000000030100000003000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602130970, 3), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e9417c423bafbfd9adbb1')}, 'updateDescription': {'updatedFields': {'x': 4}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: The second event will be the subsequent update operation (x:5) {'_id': {'_data': '015f7e9502000000050100000005000fe038'}, 'operationType': 'update', 'clusterTime': Timestamp(1602131202, 5), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e94ffc423bafbfd9adbb2')}, 'updateDescription': {'updatedFields': {'x': 5}, 'removedFields': []}} """ print(stream.try_next()) """ #Output: And finally the last event will be the insert operation (y:5) {'_id': {'_data': '015f7e9502000000060100000006000fe038'}, 'operationType': 'insert', 'clusterTime': Timestamp(1602131202, 6), 'ns': {'db': 'bar', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5f7e95025c4a569e0f6dde92')}, 'fullDocument': {'_id': ObjectId('5f7e95025c4a569e0f6dde92'), 'y': 5}} """

Transaksi di aliran perubahan

Peristiwa aliran perubahan tidak akan berisi peristiwa dari transaksi yang tidak dikomit dan/atau dibatalkan. Misalnya, jika Anda memulai transaksi dengan satu operasi INSERT dan satu operasi UPDATE dan. Jika operasi INSERT Anda berhasil, tetapi operasi UPDATE gagal, transaksi akan digulung kembali. Karena transaksi ini digulung kembali, aliran perubahan Anda tidak akan berisi peristiwa apa pun untuk transaksi ini.

Memodifikasi Durasi Retensi Log Aliran Perubahan

Anda dapat mengubah durasi retensi log aliran perubahan menjadi antara 1 jam dan 7 hari menggunakan AWS Management Console atau AWS CLI.

Using the AWS Management Console
Untuk memodifikasi durasi retensi log aliran perubahan
  1. Masuk ke AWS Management Console, dan buka konsol Amazon DocumentDB di https://console.aws.amazon.com/docdb.

  2. Di panel navigasi, pilih Grup Parameter .

    Tip

    Jika Anda tidak melihat panel navigasi di sisi kiri layar, pilih ikon menu () di pojok kiri atas halaman.

  3. Di panel Grup parameter, pilih grup parameter klaster yang terkait dengan klaster Anda. Untuk mengidentifikasi grup parameter klaster yang terkait dengan klaster Anda, lihat Menentukan grup parameter cluster Amazon DocumentDB.

  4. Halaman yang dihasilkan menunjukkan parameter dan detailnya yang sesuai untuk grup parameter klaster Anda. Pilih parameter change_stream_log_retention_duration.

  5. Di kanan atas halaman, pilih Edit untuk mengubah nilai parameter. Thechange_stream_log_retention_durationparameter dapat dimodifikasi menjadi antara 1 jam dan 7 hari.

  6. Buat perubahan Anda, lalu pilih Modifikasi parameter klaster untuk menyimpan perubahan. Untuk membuang perubahan Anda, pilih Batal.

Using the AWS CLI

Untuk mengubah parameter change_stream_log_retention_duration grup parameter klaster Anda, gunakan operasi modify-db-cluster-parameter-group dengan parameter berikut:

  • --db-cluster-parameter-group-name — Diperlukan. Nama grup parameter klaster yang Anda modifikasi. Untuk mengidentifikasi grup parameter klaster yang terkait dengan klaster Anda, lihat Menentukan grup parameter cluster Amazon DocumentDB.

  • --parameters — Diperlukan. Parameter yang Anda modifikasi. Setiap entri parameter harus menyertakan hal berikut:

    • ParameterName — Nama parameter yang Anda modifikasi. Dalam hal ini, itu adalah change_stream_log_retention_duration

    • ParameterValue — Nilai baru untuk parameter ini.

    • ApplyMethod — Bagaimana Anda ingin perubahan pada parameter ini diterapkan. Nilai yang diizinkan adalah immediate dan pending-reboot.

      catatan

      Parameter dengan ApplyType dari static harus memiliki ApplyMethod dari pending-reboot.

  1. Untuk mengubah nilai parameter change_stream_log_retention_duration, jalankan perintah berikut dan ganti parameter-value dengan nilai yang ingin Anda modifikasi parameternya.

    Untuk Linux, macOS, atau Unix:

    aws docdb modify-db-cluster-parameter-group \ --db-cluster-parameter-group-name sample-parameter-group \ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    Untuk Windows:

    aws docdb modify-db-cluster-parameter-group ^ --db-cluster-parameter-group-name sample-parameter-group ^ --parameters "ParameterName=change_stream_log_retention_duration,ParameterValue=<parameter-value>,ApplyMethod=immediate"

    Output dari operasi ini terlihat seperti berikut (format JSON).

    { "DBClusterParameterGroupName": "sample-parameter-group" }
  2. Tunggu paling tidak 5 menit.

  3. Daftar nilai parameter dari sample-parameter-group untuk memastikan bahwa perubahan Anda telah dibuat.

    Untuk Linux, macOS, atau Unix:

    aws docdb describe-db-cluster-parameters \ --db-cluster-parameter-group-name sample-parameter-group

    Untuk Windows:

    aws docdb describe-db-cluster-parameters ^ --db-cluster-parameter-group-name sample-parameter-group

    Keluaran dari operasi ini terlihat seperti berikut ini (format JSON).

    { "Parameters": [ { "ParameterName": "audit_logs", "ParameterValue": "disabled", "Description": "Enables auditing on cluster.", "Source": "system", "ApplyType": "dynamic", "DataType": "string", "AllowedValues": "enabled,disabled", "IsModifiable": true, "ApplyMethod": "pending-reboot" }, { "ParameterName": "change_stream_log_retention_duration", "ParameterValue": "12345", "Description": "Duration of time in seconds that the change stream log is retained and can be consumed.", "Source": "user", "ApplyType": "dynamic", "DataType": "integer", "AllowedValues": "3600-86400", "IsModifiable": true, "ApplyMethod": "immediate" } ] }
catatan

Mengubah retensi log aliran tidak akan menghapus log yang lebih lama dari yang dikonfigurasichange_stream_log_retention_durationnilai sampai ukuran log lebih besar dari (>) 51.200MB.