Programación basada en eventos con Amazon DocumentDB y Java - Amazon DocumentDB

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Programación basada en eventos con Amazon DocumentDB y Java

La programación basada en eventos en el contexto de Amazon DocumentDB representa un poderoso patrón arquitectónico en el que los cambios en la base de datos actúan como los principales generadores de eventos que desencadenan la lógica empresarial y los procesos posteriores. Cuando se insertan, actualizan o eliminan registros en una colección de DocumentDB, estos cambios actúan como eventos que inician automáticamente varios procesos posteriores, notificaciones o tareas de sincronización de datos. Este patrón es particularmente valioso en los sistemas distribuidos modernos, donde varias aplicaciones o servicios deben reaccionar a los cambios en los datos en tiempo real. El mecanismo principal de implementación de la programación basada en eventos en DocumentDB es mediante flujos de cambios.

nota

Esta guía asume que ha habilitado los flujos de cambios en una colección con la que está trabajando. Consulte Uso de flujos de cambios con Amazon DocumentDB para obtener información sobre cómo habilitar los flujos de cambios en la colección.

Trabajar con flujos de cambios desde la aplicación Java

El watch() método del controlador Java de MongoDB es el mecanismo principal para supervisar los cambios de datos en tiempo real en Amazon DocumentDB. Los watch() objetos y y pueden invocar el MongoClientmétodo. MongoDatabaseMongoCollection

El watch() método devuelve una instancia ChangeStreamIterableque admite varias opciones de configuración, entre las que se incluyen la búsqueda completa de actualizaciones en los documentos, el suministro de fichas de currículum y una marca de tiempo para garantizar la fiabilidad, y las etapas de agregación de canalizaciones para filtrar los cambios.

ChangeStreamIterableimplementa la interfaz básica de Java Iterable y se puede usar con. forEach() Para capturar eventos medianteforEach(), transfiera una función de devolución de llamada que forEach() procese el evento modificado. En el siguiente fragmento de código, se muestra cómo abrir un flujo de cambios en una colección para iniciar la supervisión de los eventos de cambio:

ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });

Otra forma de recorrer todos los eventos de cambio consiste en abrir un cursor que mantenga una conexión con el clúster y reciba continuamente nuevos eventos de cambio a medida que se producen. Para obtener un cursor de flujo de cambios, utilice el cursor() método de ChangeStreamIterableobjeto. El siguiente ejemplo de código muestra cómo supervisar los eventos de cambio mediante el cursor:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }

Como práctica recomendada, puede crearla MongoChangeStreamCursoren una try-with-resource sentencia o cerrar el cursor manualmente. Si se cursor() activa el método, MongoChangeStreamCursor se ChangeStreamIterabledevuelve un objeto creado sobre un ChangeStreamDocumentobjeto.

La ChangeStreamDocumentclase es un componente crucial que representa los eventos de cambio individuales en la secuencia. Contiene información detallada sobre cada modificación, incluido el tipo de operación (insertar, actualizar, eliminar, reemplazar), la clave del documento, la información del espacio de nombres y el contenido completo del documento, cuando esté disponible. La clase proporciona métodos para acceder a varios aspectos del evento de cambio, como getOperationType() determinar el tipo de cambio, acceder getFullDocument() al estado completo del documento e getDocumentKey() identificar el documento modificado.

El ChangeStreamDocumentobjeto proporciona dos datos importantes: un token de currículum y la hora del evento de cambio.

Los tokens de reanudación y las operaciones basadas en el tiempo en los flujos de cambio de DocumentDB proporcionan mecanismos cruciales para mantener la continuidad y administrar el acceso a los datos históricos. Un token de currículum es un identificador único que se genera para cada evento de cambio y sirve como marcador que permite a las aplicaciones reiniciar el procesamiento del flujo de cambios desde un punto específico tras una desconexión o un fallo. Cuando se crea un cursor de flujo de cambios, se puede utilizar un token de currículum previamente almacenado a través de resumeAfter() esta opción, lo que permite que la transmisión continúe desde donde la dejó en lugar de empezar desde el principio o perder los eventos.

Las operaciones basadas en el tiempo en los flujos de cambios ofrecen diferentes enfoques para gestionar el punto de partida de la supervisión de los eventos de cambio. La startAtOperationTime() opción le permite empezar a observar los cambios que se produjeron en una marca de tiempo específica o después de ella. Estas funciones basadas en el tiempo son particularmente valiosas en escenarios que requieren el procesamiento, la point-in-time recuperación o la sincronización de datos históricos entre sistemas.

El siguiente ejemplo de código recupera el evento asociado al documento de inserción, captura su token de currículum y, a continuación, proporciona ese token para empezar a supervisar los eventos posteriores al evento de inserción. El evento está asociado al evento de actualización y, a continuación, obtiene la hora del clúster en que se produjo la actualización y utiliza esa marca de tiempo como punto de partida para el procesamiento posterior.

BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }

De forma predeterminada, el evento de cambio de actualización no incluye el documento completo y solo incluye los cambios que se realizaron. Si necesita acceder al documento completo que se actualizó, puede llamar al fullDocument() método del ChangeStreamIterableobjeto. Tenga en cuenta que cuando solicita que se devuelva un documento completo para un evento de actualización, se devolverá el documento que existía en el momento en que se realizó la llamada para cambiar de flujo.

Este método toma una FullDocumentenumeración como parámetro. Actualmente, Amazon DocumentDB solo admite valores y UPDATE_LOOKUP DEFAULT. El siguiente fragmento de código muestra cómo solicitar el documento completo para los eventos de actualización cuando se empieza a observar los cambios:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())