Pemrograman berbasis acara dengan Amazon DocumentDB dan Java - Amazon DocumentDB

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Pemrograman berbasis acara dengan Amazon DocumentDB dan Java

Pemrograman berbasis peristiwa dalam konteks Amazon DocumentDB mewakili pola arsitektur yang kuat di mana perubahan database berfungsi sebagai generator peristiwa utama yang memicu logika dan proses bisnis berikutnya. Ketika catatan dimasukkan, diperbarui, atau dihapus dalam koleksi DocumentDB, perubahan ini bertindak sebagai peristiwa yang secara otomatis memulai berbagai proses hilir, pemberitahuan, atau tugas sinkronisasi data. Pola ini sangat berharga dalam sistem terdistribusi modern di mana beberapa aplikasi atau layanan perlu bereaksi terhadap perubahan data secara real-time. Mekanisme utama penerapan pemrograman berbasis peristiwa di DocumentDB adalah dengan mengubah aliran.

catatan

Panduan ini mengasumsikan Anda telah mengaktifkan aliran perubahan pada koleksi yang sedang Anda kerjakan. Lihat Menggunakan aliran perubahan dengan Amazon DocumentDB untuk mempelajari cara mengaktifkan aliran perubahan pada koleksi.

Bekerja dengan aliran perubahan dari aplikasi Java

watch()Metode dalam driver Java MongoDB adalah mekanisme utama untuk memantau perubahan data real-time di Amazon DocumentDB. watch()Metode ini dapat dipanggil oleh MongoClient, MongoDatabase, dan MongoCollectionobjek.

watch()Metode ini mengembalikan instance ChangeStreamIterableyang mendukung berbagai opsi konfigurasi, termasuk pencarian dokumen lengkap untuk pembaruan, menyediakan token resume dan stempel waktu untuk keandalan, dan tahapan agregasi pipeline untuk perubahan pemfilteran.

ChangeStreamIterablemengimplementasikan antarmuka inti Java Iterable dan dapat digunakan denganforEach(). Untuk menangkap peristiwa yang digunakanforEach(), teruskan fungsi callback ke forEach() yang memproses peristiwa yang diubah. Cuplikan kode berikut menunjukkan cara membuka aliran perubahan pada koleksi untuk memulai pemantauan peristiwa perubahan:

ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });

Cara lain untuk melintasi semua peristiwa perubahan adalah dengan membuka kursor yang memelihara koneksi ke cluster dan terus menerima peristiwa perubahan baru saat terjadi. Untuk mendapatkan kursor aliran perubahan, gunakan cursor() metode objek. ChangeStreamIterable Contoh kode berikut menunjukkan cara memantau peristiwa perubahan menggunakan kursor:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }

Sebagai praktik terbaik, buat MongoChangeStreamCursordalam try-with-resource pernyataan atau tutup kursor secara manual. Memanggil cursor() metode pada ChangeStreamIterablemengembalikan MongoChangeStreamCursor yang dibuat di atas ChangeStreamDocumentobjek.

ChangeStreamDocumentKelas adalah komponen penting yang mewakili peristiwa perubahan individu dalam aliran. Ini berisi informasi rinci tentang setiap modifikasi, termasuk jenis operasi (menyisipkan, memperbarui, menghapus, mengganti), kunci dokumen, informasi namespace, dan konten dokumen lengkap bila tersedia. Kelas menyediakan metode untuk mengakses berbagai aspek dari peristiwa perubahan, seperti getOperationType() untuk menentukan jenis perubahan, getFullDocument() untuk mengakses status dokumen lengkap, dan getDocumentKey() untuk mengidentifikasi dokumen yang dimodifikasi.

ChangeStreamDocumentObjek menyediakan dua informasi penting, token resume dan waktu peristiwa perubahan.

Lanjutkan token dan operasi berbasis waktu di aliran perubahan DocumentDB menyediakan mekanisme penting untuk menjaga kesinambungan dan mengelola akses data historis. Token resume adalah pengidentifikasi unik yang dihasilkan untuk setiap peristiwa perubahan, berfungsi sebagai bookmark yang memungkinkan aplikasi untuk memulai ulang pemrosesan aliran perubahan dari titik tertentu setelah pemutusan atau kegagalan. Ketika kursor aliran perubahan dibuat, ia dapat menggunakan token resume yang disimpan sebelumnya melalui resumeAfter() opsi, memungkinkan aliran untuk melanjutkan dari tempat yang ditinggalkannya daripada memulai dari awal atau kehilangan acara.

Operasi berbasis waktu dalam aliran perubahan menawarkan pendekatan yang berbeda untuk mengelola titik awal pemantauan peristiwa perubahan. startAtOperationTime()Opsi ini memungkinkan Anda untuk mulai menonton perubahan yang terjadi pada atau setelah stempel waktu tertentu. Fitur berbasis waktu ini sangat berharga dalam skenario yang membutuhkan pemrosesan data historis, point-in-time pemulihan, atau sinkronisasi antar sistem.

Contoh kode berikut mengambil peristiwa yang terkait dengan dokumen insert, menangkap token resume, dan kemudian menyediakan token itu untuk mulai memantau peristiwa setelah acara insert. Acara ini dikaitkan dengan peristiwa pembaruan, kemudian mendapatkan waktu cluster saat pembaruan terjadi dan menggunakan stempel waktu itu sebagai titik awal untuk pemrosesan lebih lanjut.

BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }

Secara default, peristiwa perubahan pembaruan tidak menyertakan dokumen lengkap dan hanya mencakup perubahan yang dibuat. Jika Anda perlu mengakses dokumen lengkap yang diperbarui, Anda dapat memanggil fullDocument() metode pada ChangeStreamIterableobjek. Perlu diingat bahwa ketika Anda meminta dokumen lengkap dikembalikan untuk acara pembaruan, ia mengembalikan dokumen yang ada pada saat panggilan untuk mengubah aliran dibuat.

Metode ini mengambil FullDocumentenum sebagai parameter. Saat ini, Amazon DocumentDB hanya mendukung DEFAULT dan nilai. UPDATE_LOOKUP Cuplikan kode berikut menunjukkan cara meminta dokumen lengkap untuk acara pembaruan saat mulai melihat perubahan:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())