使用 Amazon DocumentDB 和 Java 进行事件驱动编程 - Amazon DocumentDB

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon DocumentDB 和 Java 进行事件驱动编程

Amazon DocumentDB 环境中的事件驱动编程代表了一种强大的架构模式,其中数据库更改充当触发后续业务逻辑和流程的主要事件生成器。在 DocumentDB 集合中插入、更新或删除记录时,这些更改充当自动启动各种下游进程、通知或数据同步任务的事件。这种模式在现代分布式系统中特别有价值,在这些系统中,多个应用程序或服务需要对数据变化做出实时反应。在 DocumentDB 中实现事件驱动编程的主要机制是通过变更流。

注意

本指南假设您已在使用的集合上启用了更改流。将变更流与 Amazon DocumentDB 结合使用要了解如何在集合上启用变更流,请参阅。

处理来自 Java 应用程序的更改流

MongoDB 的 Java 驱动程序中的watch()方法是监控 Amazon DocumentDB 中实时数据变化的主要机制。该watch()方法可以由MongoClientMongoDatabase、和MongoCollection对象调用。

watch()方法返回一个支持各种配置选项的ChangeStreamIterable实例,包括用于更新的完整文档查找、提供简历令牌和时间戳以提高可靠性,以及用于筛选更改的管道聚合阶段。

ChangeStreamIterable实现了核心 Java 接口Iterable,可以与forEach()。要使用捕获事件forEach(),请将回调函数传递给forEach()处理已更改的事件。以下代码片段显示了如何打开集合上的更改流以启动更改事件监控:

ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });

遍历所有更改事件的另一种方法是打开游标,该游标可保持与集群的连接,并在发生新的更改事件时持续接收这些事件。要获取更改流游标,请使用 ChangeStreamIterableobject cursor() 的方法。以下代码示例显示了如何使用游标监控变更事件:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }

最佳做法是,要么在 try-with-resource语句MongoChangeStreamCursor中创建,要么手动关闭游标。在上调用该cursor()方法MongoChangeStreamCursorChangeStreamIterable返回在ChangeStreamDocument对象上创建的。

ChangeStreamDocument类是代表流中各个变更事件的关键组件。它包含有关每项修改的详细信息,包括操作类型(插入、更新、删除、替换)、文档密钥、命名空间信息以及完整的文档内容(如果有)。该类提供了访问变更事件各个方面的方法,例如getOperationType()确定更改的类型、getFullDocument()访问完整的文档状态以及getDocumentKey()识别修改后的文档。

ChangeStreamDocument对象提供两条重要信息,即恢复令牌和变更事件时间。

DocumentDB 变更流中的简历令牌和基于时间的操作为保持连续性和管理历史数据访问提供了关键机制。恢复令牌是为每个更改事件生成的唯一标识符,用作书签,允许应用程序在断开连接或失败后从特定时间点重新启动更改流处理。创建更改流光标时,它可以通过该resumeAfter()选项使用先前存储的恢复令牌,从而使流能够从上次停下来的地方继续,而不是从头开始或丢失事件。

变更流中基于时间的操作提供了不同的方法来管理变更事件监控的起点。该startAtOperationTime()选项允许您开始关注在特定时间戳或之后发生的更改。这些基于时间的功能在需要历史数据处理、 point-in-time恢复或系统间同步的场景中特别有价值。

以下代码示例检索与插入文档关联的事件,捕获其恢复令牌,然后提供该令牌以在插入事件之后开始监视事件。该事件与更新事件关联,然后获取更新发生时的集群时间,并使用该时间戳作为进一步处理的起点。

BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }

默认情况下,update change 事件不包括完整的文档,它只包括所做的更改。如果您需要访问已更新的完整文档,则可以在ChangeStreamIterable对象上调用fullDocument()方法。请记住,当您要求返回更新事件的完整文档时,它会返回调用更改流时存在的文档。

此方法采用FullDocument枚举作为参数。目前,亚马逊 DocumentDB 仅支持默认值和值。UPDATE_LOOKUP以下代码片段显示了在开始监视更改时如何要求提供更新事件的完整文档:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())