本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Amazon DocumentDB 和 Java 進行事件驅動程式設計
Amazon DocumentDB 內容中的事件驅動程式設計代表強大的架構模式,其中資料庫變更做為觸發後續商業邏輯和程序的主要事件產生器。在 DocumentDB 集合中插入、更新或刪除記錄時,這些變更會做為自動啟動各種下游程序、通知或資料同步任務的事件。此模式在現代分散式系統中特別有價值,其中多個應用程式或服務需要即時回應資料變更。在 DocumentDB 中實作事件驅動型程式設計的主要機制是變更串流。
注意
本指南假設您已在正在使用的集合上啟用變更串流。請參閱 以搭配 Amazon DocumentDB 使用變更串流了解如何在集合上啟用變更串流。
從 Java 應用程式使用變更串流
MongoDB Java 驅動程式中的 watch()
方法是監控 Amazon DocumentDB 中即時資料變更的主要機制。MongoClient
MongoDatabase
MongoCollection
watch()
方法。
watch()
方法會傳回支援各種組態選項ChangeStreamIterable
ChangeStreamIterable
Iterable
,並可搭配 使用forEach()
。若要使用 擷取事件forEach()
,請將回呼函數傳遞至 forEach()
以處理變更的事件。下列程式碼片段顯示如何在集合上開啟變更串流,以開始變更事件監控:
ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });
周遊所有變更事件的另一種方法是開啟游標,以維持與叢集的連線,並在發生變更事件時持續接收新的變更事件。若要取得變更串流游標,請使用 ChangeStreamIterable
cursor()
方法。下列程式碼範例示範如何使用游標監控變更事件:
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }
最佳實務是在 try-with-resource 陳述式MongoChangeStreamCursor
cursor()
方法會ChangeStreamIterable
ChangeStreamDocument
MongoChangeStreamCursor
的 。
ChangeStreamDocument
getOperationType()
判斷變更類型、getFullDocument()
存取完整文件狀態,以及getDocumentKey()
識別修改後的文件。
ChangeStreamDocument
在 DocumentDB 變更串流中恢復字符和以時間為基礎的操作,為維護持續性和管理歷史資料存取提供了關鍵機制。恢復權杖是為每個變更事件產生的唯一識別符,做為書籤,允許應用程式在中斷連線或失敗後從特定點重新啟動變更串流處理。建立變更串流游標時,可以透過 resumeAfter()
選項使用先前儲存的恢復權杖,讓串流從停止的位置繼續,而不是從開始或遺失事件開始。
變更串流中以時間為基礎的操作提供不同的方法來管理變更事件監控的起點。startAtOperationTime()
選項可讓您開始監看在特定時間戳記或之後發生的變更。這些以時間為基礎的功能在需要歷史資料處理、point-in-time復原或系統間同步的情況中特別重要。
下列程式碼範例會擷取與插入文件相關聯的事件、擷取其恢復權杖,然後提供該權杖,以在插入事件之後開始監控事件。事件與更新事件相關聯,然後在發生更新時取得叢集時間,並使用該時間戳記做為進一步處理的起點。
BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }
根據預設,更新變更事件不包含完整文件,而且只包含所做的變更。如果您需要存取已更新的完整文件,您可以呼叫 ChangeStreamIterable
fullDocument()
方法。請記住,當您要求針對更新事件傳回完整文件時,它會傳回進行變更串流呼叫時存在的文件。
此方法使用FullDocument
UPDATE_LOOKUP
值。下列程式碼片段顯示如何在開始監看變更時,要求更新事件的完整文件:
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())