Ereignisgesteuerte Programmierung mit Amazon DocumentDB und Java - Amazon DocumentDB

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Ereignisgesteuerte Programmierung mit Amazon DocumentDB und Java

Die ereignisgesteuerte Programmierung im Kontext von Amazon DocumentDB stellt ein leistungsstarkes Architekturmuster dar, bei dem Datenbankänderungen als primäre Ereignisgeneratoren dienen, die nachfolgende Geschäftslogik und -prozesse auslösen. Wenn Datensätze in eine DocumentDB-Auflistung eingefügt, aktualisiert oder gelöscht werden, wirken sich diese Änderungen als Ereignisse aus, die automatisch verschiedene nachgelagerte Prozesse, Benachrichtigungen oder Datensynchronisierungsaufgaben auslösen. Dieses Muster ist besonders nützlich in modernen verteilten Systemen, in denen mehrere Anwendungen oder Dienste in Echtzeit auf Datenänderungen reagieren müssen. Der Hauptmechanismus für die Implementierung ereignisgesteuerter Programmierung in DocumentDB sind Change-Streams.

Anmerkung

In diesem Handbuch wird davon ausgegangen, dass Sie Change-Streams für eine Sammlung aktiviert haben, mit der Sie arbeiten. Weitere Informationen Change-Streams mit Amazon DocumentDB verwenden zum Aktivieren von Change-Streams in der Sammlung finden Sie unter.

Arbeiten mit Change-Streams aus der Java-Anwendung

Die watch() Methode im Java-Treiber von MongoDB ist der wichtigste Mechanismus zur Überwachung von Datenänderungen in Echtzeit in Amazon DocumentDB. Die watch() Methode kann von Objekten MongoClientMongoDatabase, und MongoCollectionaufgerufen werden.

Die watch() Methode gibt eine Instanz zurück ChangeStreamIterable, die verschiedene Konfigurationsoptionen unterstützt, darunter die vollständige Suche nach Aktualisierungen in Dokumenten, die Bereitstellung von Wiederaufnahmetokens und Zeitstempeln für die Zuverlässigkeit sowie Pipeline-Aggregationsphasen zum Filtern von Änderungen.

ChangeStreamIterableimplementiert die Java-Kernschnittstelle Iterable und kann mit verwendet werden. forEach() Um Ereignisse mithilfe von zu erfassenforEach(), übergeben Sie eine Callback-Funktion anforEach(), die das geänderte Ereignis verarbeitet. Der folgende Codeausschnitt zeigt, wie Sie einen Change-Stream für eine Sammlung öffnen, um die Überwachung von Änderungsereignissen zu starten:

ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });

Eine andere Möglichkeit, alle Änderungsereignisse zu durchlaufen, besteht darin, einen Cursor zu öffnen, der eine Verbindung zum Cluster aufrechterhält und kontinuierlich neue Änderungsereignisse empfängt, sobald sie auftreten. Um einen Change-Streams-Cursor zu erhalten, verwenden Sie die cursor() Methode object. ChangeStreamIterable Das folgende Codebeispiel zeigt, wie Änderungsereignisse mithilfe des Cursors überwacht werden:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }

Es hat sich bewährt, entweder den MongoChangeStreamCursorin einer try-with-resource Anweisung zu erstellen oder den Cursor manuell zu schließen. Beim Aufrufen der cursor() Methode on wird eine ChangeStreamIterablezurückgegebenMongoChangeStreamCursor, die über einem ChangeStreamDocumentObjekt erstellt wurde.

Die ChangeStreamDocumentKlasse ist eine wichtige Komponente, die einzelne Änderungsereignisse im Stream darstellt. Sie enthält detaillierte Informationen zu jeder Änderung, einschließlich des Vorgangstyps (Einfügen, Aktualisieren, Löschen, Ersetzen), des Dokumentschlüssels, der Namespace-Informationen und des vollständigen Dokumentinhalts, sofern verfügbar. Die Klasse bietet Methoden für den Zugriff auf verschiedene Aspekte des Änderungsereignisses, z. B. getOperationType() für die Bestimmung der Art der Änderung, für den getFullDocument() Zugriff auf den vollständigen Status des Dokuments und getDocumentKey() für die Identifizierung des geänderten Dokuments.

Das ChangeStreamDocumentObjekt stellt zwei wichtige Informationen bereit: ein Wiederaufnahme-Token und die Uhrzeit des Änderungsereignisses.

Resume-Tokens und zeitbasierte Operationen in DocumentDB-Change-Streams bieten wichtige Mechanismen für die Aufrechterhaltung der Kontinuität und die Verwaltung des Zugriffs auf historische Daten. Ein Resume-Token ist eine eindeutige Kennung, die für jedes Änderungsereignis generiert wird. Sie dient als Lesezeichen, mit dem Anwendungen die Verarbeitung von Change-Streams nach Verbindungsabbrüchen oder Ausfällen an einem bestimmten Punkt wieder aufnehmen können. Wenn ein Change-Stream-Cursor erstellt wird, kann er über resumeAfter() diese Option ein zuvor gespeichertes Resume-Token verwenden, sodass der Stream dort weitergeführt werden kann, wo er aufgehört hat, anstatt am Anfang zu beginnen oder Ereignisse zu verlieren.

Zeitbasierte Operationen in Change-Streams bieten unterschiedliche Ansätze, um den Ausgangspunkt der Überwachung von Änderungsereignissen zu verwalten. Mit dieser startAtOperationTime() Option können Sie beginnen, Änderungen zu beobachten, die zu oder nach einem bestimmten Zeitstempel vorgenommen wurden. Diese zeitbasierten Funktionen sind besonders nützlich in Szenarien, in denen historische Daten verarbeitet, point-in-time wiederhergestellt oder zwischen Systemen synchronisiert werden müssen.

Im folgenden Codebeispiel wird das Ereignis abgerufen, das mit dem Einfügedokument verknüpft ist, dessen Resume-Token erfasst und dann dieses Token bereitgestellt, um mit der Überwachung auf Ereignisse nach dem Einfügeereignis zu beginnen. Das Ereignis wird dem Aktualisierungsereignis zugeordnet, ruft dann die Clusterzeit ab, zu der die Aktualisierung stattgefunden hat, und verwendet diesen Zeitstempel als Ausgangspunkt für die weitere Verarbeitung.

BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }

Standardmäßig umfasst das Aktualisierungsänderungsereignis nicht das gesamte Dokument und nur die vorgenommenen Änderungen. Wenn Sie auf das gesamte Dokument zugreifen müssen, das aktualisiert wurde, können Sie die fullDocument() Methode für das ChangeStreamIterableObjekt aufrufen. Denken Sie daran, dass, wenn Sie bei einem Aktualisierungsereignis die Rückgabe eines vollständigen Dokuments anfordern, das Dokument zurückgegeben wird, das zum Zeitpunkt des Aufrufs der Change-Streams vorhanden war.

Diese Methode verwendet eine FullDocumentAufzählung als Parameter. Derzeit unterstützt Amazon DocumentDB nur DEFAULT und UPDATE_LOOKUP Werte. Der folgende Codeausschnitt zeigt, wie Sie das vollständige Dokument für Aktualisierungsereignisse anfordern, wenn Sie beginnen, nach Änderungen zu suchen:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())