Programmation pilotée par les événements avec Amazon DocumentDB et Java - Amazon DocumentDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Programmation pilotée par les événements avec Amazon DocumentDB et Java

La programmation pilotée par les événements dans le contexte d'Amazon DocumentDB représente un modèle architectural puissant dans lequel les modifications de base de données sont les principaux générateurs d'événements qui déclenchent la logique et les processus métier ultérieurs. Lorsque des enregistrements sont insérés, mis à jour ou supprimés dans une collection DocumentDB, ces modifications agissent comme des événements qui déclenchent automatiquement divers processus, notifications ou tâches de synchronisation de données en aval. Ce modèle est particulièrement utile dans les systèmes distribués modernes où de multiples applications ou services doivent réagir aux modifications des données en temps réel. Le principal mécanisme de mise en œuvre de la programmation événementielle dans DocumentDB repose sur les flux de modifications.

Note

Ce guide part du principe que vous avez activé les flux de modifications sur une collection avec laquelle vous travaillez. Consultez Utilisation de flux de modifications avec Amazon DocumentDB pour savoir comment activer les flux de modifications dans la collection.

Utilisation des flux de modifications à partir de l'application Java

La watch() méthode du pilote Java de MongoDB est le principal mécanisme de surveillance des modifications des données en temps réel dans Amazon DocumentDB. La watch() méthode peut être appelée par MongoClientMongoDatabase, et MongoCollectiondes objets.

La watch() méthode renvoie une instance ChangeStreamIterablequi prend en charge diverses options de configuration, notamment la recherche complète de documents pour les mises à jour, la fourniture de jetons de CV et d'horodatage pour des raisons de fiabilité, et les étapes d'agrégation de pipeline pour filtrer les modifications.

ChangeStreamIterableimplémente l'interface Java de base Iterable et peut être utilisé avecforEach(). Pour capturer des événements à l'aide deforEach(), transmettez une fonction de rappel forEach() qui traite l'événement modifié. L'extrait de code suivant montre comment ouvrir un flux de modifications dans une collection pour démarrer le suivi des événements de changement :

ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });

Une autre façon de parcourir tous les événements de modification consiste à ouvrir un curseur qui maintient une connexion avec le cluster et reçoit en permanence les nouveaux événements de modification au fur et à mesure qu'ils se produisent. Pour obtenir un curseur de flux de modifications, utilisez la cursor() méthode de l'ChangeStreamIterableobjet. L'exemple de code suivant montre comment surveiller les événements de changement à l'aide du curseur :

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }

Il est recommandé de créer le MongoChangeStreamCursordans une try-with-resource instruction ou de fermer manuellement le curseur. L'appel de la cursor() méthode on ChangeStreamIterablerenvoie un MongoChangeStreamCursor qui est créé sur un ChangeStreamDocumentobjet.

La ChangeStreamDocumentclasse est un élément crucial qui représente les événements de changement individuels dans le flux. Il contient des informations détaillées sur chaque modification, notamment le type d'opération (insertion, mise à jour, suppression, remplacement), la clé du document, les informations sur l'espace de noms et le contenu complet du document lorsqu'il est disponible. La classe fournit des méthodes permettant d'accéder à divers aspects de l'événement de changement, tels que getOperationType() la détermination du type de modification, l'accès getFullDocument() à l'état complet du document et getDocumentKey() l'identification du document modifié.

L'ChangeStreamDocumentobjet fournit deux informations importantes : un jeton de CV et l'heure de l'événement de changement.

Les jetons de CV et les opérations basées sur le temps dans les flux de modifications de DocumentDB fournissent des mécanismes essentiels pour maintenir la continuité et gérer l'accès aux données historiques. Un jeton de reprise est un identifiant unique généré pour chaque événement de changement, servant de signet permettant aux applications de redémarrer le traitement du flux de modifications à partir d'un point spécifique après une déconnexion ou un échec. Lorsqu'un curseur de flux de modifications est créé, il peut utiliser un jeton de reprise précédemment stocké via l'resumeAfter()option, permettant au flux de continuer là où il s'est arrêté plutôt que de recommencer depuis le début ou de perdre des événements.

Les opérations basées sur le temps dans les flux de changement proposent différentes approches pour gérer le point de départ de la surveillance des événements de changement. startAtOperationTime()Cette option vous permet de commencer à observer les modifications survenues pendant ou après un horodatage spécifique. Ces fonctionnalités basées sur le temps sont particulièrement utiles dans les scénarios nécessitant le traitement, la point-in-time restauration ou la synchronisation des données historiques entre les systèmes.

L'exemple de code suivant récupère l'événement associé au document d'insertion, capture son jeton de reprise, puis fournit ce jeton pour commencer à surveiller les événements après l'événement d'insertion. L'événement est associé à l'événement de mise à jour, puis obtient l'heure du cluster lorsque la mise à jour a eu lieu et utilise cet horodatage comme point de départ pour un traitement ultérieur.

BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }

Par défaut, l'événement de modification de mise à jour n'inclut pas le document complet et inclut uniquement les modifications apportées. Si vous devez accéder à l'intégralité du document mis à jour, vous pouvez appeler la fullDocument() méthode sur l'ChangeStreamIterableobjet. N'oubliez pas que lorsque vous demandez le renvoi d'un document complet pour un événement de mise à jour, il renvoie le document qui existe au moment de l'appel pour modifier les flux.

Cette méthode prend une FullDocumenténumération comme paramètre. Actuellement, Amazon DocumentDB ne prend en charge que les valeurs DEFAULT et les valeursUPDATE_LOOKUP. L'extrait de code suivant montre comment demander un document complet pour les événements de mise à jour lorsque vous commencez à surveiller les modifications :

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())