Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Ereignisgesteuerte Programmierung mit Amazon DocumentDB und Java
Die ereignisgesteuerte Programmierung im Kontext von Amazon DocumentDB stellt ein leistungsstarkes Architekturmuster dar, bei dem Datenbankänderungen als primäre Ereignisgeneratoren dienen, die nachfolgende Geschäftslogik und -prozesse auslösen. Wenn Datensätze in eine DocumentDB-Auflistung eingefügt, aktualisiert oder gelöscht werden, wirken sich diese Änderungen als Ereignisse aus, die automatisch verschiedene nachgelagerte Prozesse, Benachrichtigungen oder Datensynchronisierungsaufgaben auslösen. Dieses Muster ist besonders nützlich in modernen verteilten Systemen, in denen mehrere Anwendungen oder Dienste in Echtzeit auf Datenänderungen reagieren müssen. Der Hauptmechanismus für die Implementierung ereignisgesteuerter Programmierung in DocumentDB sind Change-Streams.
Anmerkung
In diesem Handbuch wird davon ausgegangen, dass Sie Change-Streams für eine Sammlung aktiviert haben, mit der Sie arbeiten. Weitere Informationen Change-Streams mit Amazon DocumentDB verwenden zum Aktivieren von Change-Streams in der Sammlung finden Sie unter.
Arbeiten mit Change-Streams aus der Java-Anwendung
Die watch()
Methode im Java-Treiber von MongoDB ist der wichtigste Mechanismus zur Überwachung von Datenänderungen in Echtzeit in Amazon DocumentDB. Die watch()
Methode kann von Objekten MongoClient
MongoDatabase
MongoCollection
Die watch()
Methode gibt eine Instanz zurück ChangeStreamIterable
ChangeStreamIterable
Iterable
und kann mit verwendet werden. forEach()
Um Ereignisse mithilfe von zu erfassenforEach()
, übergeben Sie eine Callback-Funktion anforEach()
, die das geänderte Ereignis verarbeitet. Der folgende Codeausschnitt zeigt, wie Sie einen Change-Stream für eine Sammlung öffnen, um die Überwachung von Änderungsereignissen zu starten:
ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });
Eine andere Möglichkeit, alle Änderungsereignisse zu durchlaufen, besteht darin, einen Cursor zu öffnen, der eine Verbindung zum Cluster aufrechterhält und kontinuierlich neue Änderungsereignisse empfängt, sobald sie auftreten. Um einen Change-Streams-Cursor zu erhalten, verwenden Sie die cursor()
Methode object. ChangeStreamIterable
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }
Es hat sich bewährt, entweder den MongoChangeStreamCursor
cursor()
Methode on wird eine ChangeStreamIterable
MongoChangeStreamCursor
, die über einem ChangeStreamDocument
Die ChangeStreamDocument
getOperationType()
für die Bestimmung der Art der Änderung, für den getFullDocument()
Zugriff auf den vollständigen Status des Dokuments und getDocumentKey()
für die Identifizierung des geänderten Dokuments.
Das ChangeStreamDocument
Resume-Tokens und zeitbasierte Operationen in DocumentDB-Change-Streams bieten wichtige Mechanismen für die Aufrechterhaltung der Kontinuität und die Verwaltung des Zugriffs auf historische Daten. Ein Resume-Token ist eine eindeutige Kennung, die für jedes Änderungsereignis generiert wird. Sie dient als Lesezeichen, mit dem Anwendungen die Verarbeitung von Change-Streams nach Verbindungsabbrüchen oder Ausfällen an einem bestimmten Punkt wieder aufnehmen können. Wenn ein Change-Stream-Cursor erstellt wird, kann er über resumeAfter()
diese Option ein zuvor gespeichertes Resume-Token verwenden, sodass der Stream dort weitergeführt werden kann, wo er aufgehört hat, anstatt am Anfang zu beginnen oder Ereignisse zu verlieren.
Zeitbasierte Operationen in Change-Streams bieten unterschiedliche Ansätze, um den Ausgangspunkt der Überwachung von Änderungsereignissen zu verwalten. Mit dieser startAtOperationTime()
Option können Sie beginnen, Änderungen zu beobachten, die zu oder nach einem bestimmten Zeitstempel vorgenommen wurden. Diese zeitbasierten Funktionen sind besonders nützlich in Szenarien, in denen historische Daten verarbeitet, point-in-time wiederhergestellt oder zwischen Systemen synchronisiert werden müssen.
Im folgenden Codebeispiel wird das Ereignis abgerufen, das mit dem Einfügedokument verknüpft ist, dessen Resume-Token erfasst und dann dieses Token bereitgestellt, um mit der Überwachung auf Ereignisse nach dem Einfügeereignis zu beginnen. Das Ereignis wird dem Aktualisierungsereignis zugeordnet, ruft dann die Clusterzeit ab, zu der die Aktualisierung stattgefunden hat, und verwendet diesen Zeitstempel als Ausgangspunkt für die weitere Verarbeitung.
BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }
Standardmäßig umfasst das Aktualisierungsänderungsereignis nicht das gesamte Dokument und nur die vorgenommenen Änderungen. Wenn Sie auf das gesamte Dokument zugreifen müssen, das aktualisiert wurde, können Sie die fullDocument()
Methode für das ChangeStreamIterable
Diese Methode verwendet eine FullDocument
UPDATE_LOOKUP
Werte. Der folgende Codeausschnitt zeigt, wie Sie das vollständige Dokument für Aktualisierungsereignisse anfordern, wenn Sie beginnen, nach Änderungen zu suchen:
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())