使用 Amazon DocumentDB 和 Java 進行事件驅動程式設計 - Amazon DocumentDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Amazon DocumentDB 和 Java 進行事件驅動程式設計

Amazon DocumentDB 內容中的事件驅動程式設計代表強大的架構模式,其中資料庫變更做為觸發後續商業邏輯和程序的主要事件產生器。在 DocumentDB 集合中插入、更新或刪除記錄時,這些變更會做為自動啟動各種下游程序、通知或資料同步任務的事件。此模式在現代分散式系統中特別有價值,其中多個應用程式或服務需要即時回應資料變更。在 DocumentDB 中實作事件驅動型程式設計的主要機制是變更串流。

注意

本指南假設您已在正在使用的集合上啟用變更串流。請參閱 以搭配 Amazon DocumentDB 使用變更串流了解如何在集合上啟用變更串流。

從 Java 應用程式使用變更串流

MongoDB Java 驅動程式中的 watch()方法是監控 Amazon DocumentDB 中即時資料變更的主要機制。MongoClientMongoDatabaseMongoCollection 物件可以在 上呼叫 watch()方法。

watch() 方法會傳回支援各種組態選項ChangeStreamIterable的 執行個體,包括更新的完整文件查詢、提供可靠性的恢復權杖和時間戳記,以及篩選變更的管道彙總階段。

ChangeStreamIterable 實作核心 Java 界面Iterable,並可搭配 使用forEach()。若要使用 擷取事件forEach(),請將回呼函數傳遞至 forEach()以處理變更的事件。下列程式碼片段顯示如何在集合上開啟變更串流,以開始變更事件監控:

ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });

周遊所有變更事件的另一種方法是開啟游標,以維持與叢集的連線,並在發生變更事件時持續接收新的變更事件。若要取得變更串流游標,請使用 ChangeStreamIterable 物件的 cursor()方法。下列程式碼範例示範如何使用游標監控變更事件:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }

最佳實務是在 try-with-resource 陳述式MongoChangeStreamCursor中建立 ,或手動關閉游標。在 上呼叫 cursor()方法會ChangeStreamIterable傳回透過ChangeStreamDocument物件建立MongoChangeStreamCursor的 。

ChangeStreamDocument 類別是代表串流中個別變更事件的重要元件。它包含每個修改的詳細資訊,包括操作類型 (插入、更新、刪除、取代)、文件金鑰、命名空間資訊,以及可用的完整文件內容。類別提供存取變更事件各種層面的方法,例如getOperationType()判斷變更類型、getFullDocument()存取完整文件狀態,以及getDocumentKey()識別修改後的文件。

ChangeStreamDocument 物件提供兩項重要的資訊,即變更事件的恢復權杖和時間。

在 DocumentDB 變更串流中恢復字符和以時間為基礎的操作,為維護持續性和管理歷史資料存取提供了關鍵機制。恢復權杖是為每個變更事件產生的唯一識別符,做為書籤,允許應用程式在中斷連線或失敗後從特定點重新啟動變更串流處理。建立變更串流游標時,可以透過 resumeAfter()選項使用先前儲存的恢復權杖,讓串流從停止的位置繼續,而不是從開始或遺失事件開始。

變更串流中以時間為基礎的操作提供不同的方法來管理變更事件監控的起點。startAtOperationTime() 選項可讓您開始監看在特定時間戳記或之後發生的變更。這些以時間為基礎的功能在需要歷史資料處理、point-in-time復原或系統間同步的情況中特別重要。

下列程式碼範例會擷取與插入文件相關聯的事件、擷取其恢復權杖,然後提供該權杖,以在插入事件之後開始監控事件。事件與更新事件相關聯,然後在發生更新時取得叢集時間,並使用該時間戳記做為進一步處理的起點。

BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }

根據預設,更新變更事件不包含完整文件,而且只包含所做的變更。如果您需要存取已更新的完整文件,您可以呼叫 ChangeStreamIterable 物件上的 fullDocument()方法。請記住,當您要求針對更新事件傳回完整文件時,它會傳回進行變更串流呼叫時存在的文件。

此方法使用FullDocument列舉做為參數。目前,Amazon DocumentDB 僅支援 DEFAULT 和 UPDATE_LOOKUP 值。下列程式碼片段顯示如何在開始監看變更時,要求更新事件的完整文件:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())