Amazon DocumentDB と Java を使用したイベント駆動型プログラミング - Amazon DocumentDB

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon DocumentDB と Java を使用したイベント駆動型プログラミング

Amazon DocumentDB のコンテキストにおけるイベント駆動型プログラミングは、データベースの変更が後続のビジネスロジックとプロセスをトリガーする主要なイベントジェネレーターとして機能する強力なアーキテクチャパターンを表します。DocumentDB コレクションでレコードが挿入、更新、または削除されると、これらの変更はさまざまなダウンストリームプロセス、通知、またはデータ同期タスクを自動的に開始するイベントとして機能します。このパターンは、複数のアプリケーションまたはサービスがデータ変更にリアルタイムで対応する必要がある最新の分散システムでは特に重要です。DocumentDB でイベント駆動型プログラミングを実装する主なメカニズムは、変更ストリームです。

注記

このガイドでは、作業中のコレクションで変更ストリームを有効にしていることを前提としています。コレクションで変更ストリームを有効にするAmazon DocumentDB を用いた変更ストリームの使用方法については、「」を参照してください。

Java アプリケーションからの変更ストリームの使用

MongoDB の Java ドライバーの watch()メソッドは、Amazon DocumentDB でリアルタイムのデータ変更をモニタリングするための主要なメカニズムです。watch() メソッドは、MongoClient、、MongoDatabaseおよび MongoCollection オブジェクトによって で呼び出すことができます。

このwatch()メソッドは、更新の完全なドキュメント検索、信頼性のための再開トークンとタイムスタンプの提供、変更をフィルタリングするためのパイプライン集約ステージなど、さまざまな設定オプションChangeStreamIterableをサポートする のインスタンスを返します。

ChangeStreamIterable はコア Java インターフェイスを実装Iterableしており、 で使用できますforEach()。を使用してイベントをキャプチャするにはforEach()、変更されたイベントforEach()を処理するコールバック関数を に渡します。次のコードスニペットは、コレクションで変更ストリームを開いて変更イベントのモニタリングを開始する方法を示しています。

ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });

すべての変更イベントを通過するもう 1 つの方法は、クラスターへの接続を維持し、発生時に新しい変更イベントを継続的に受信するカーソルを開くことです。変更ストリームカーソルを取得するには、 ChangeStreamIterable オブジェクトの cursor()メソッドを使用します。次のコード例は、カーソルを使用して変更イベントをモニタリングする方法を示しています。

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }

ベストプラクティスとして、 try-with-resource ステートメントMongoChangeStreamCursorで を作成するか、カーソルを手動で閉じます。で cursor()メソッドを呼び出すと、 ChangeStreamDocument オブジェクト上に作成された MongoChangeStreamCursor ChangeStreamIterableが返されます。

ChangeStreamDocument クラスは、ストリーム内の個々の変更イベントを表す重要なコンポーネントです。これには、オペレーションタイプ (挿入、更新、削除、置換)、ドキュメントキー、名前空間情報、および利用可能な場合の完全なドキュメントコンテンツなど、各変更に関する詳細情報が含まれます。クラスは、変更のタイプgetOperationType()の決定、完全なドキュメント状態getFullDocument()へのアクセス、変更されたドキュメントのgetDocumentKey()識別など、変更イベントのさまざまな側面にアクセスする方法を提供します。

ChangeStreamDocument オブジェクトは、再開トークンと変更イベントの時刻の 2 つの重要な情報を提供します。

DocumentDB 変更ストリームでのトークンの再開と時間ベースのオペレーションは、継続性を維持し、履歴データアクセスを管理するための重要なメカニズムを提供します。再開トークンは、変更イベントごとに生成される一意の識別子であり、切断または障害後にアプリケーションが特定のポイントから変更ストリーム処理を再開できるようにするブックマークとして機能します。変更ストリームカーソルを作成すると、 resumeAfter()オプションを使用して以前に保存した再開トークンを使用できます。これにより、ストリームは最初から開始したり、イベントを失うのではなく、中断した場所から続行できます。

変更ストリームの時間ベースのオペレーションは、変更イベントモニタリングの開始点を管理するためのさまざまなアプローチを提供します。startAtOperationTime() オプションを使用すると、特定のタイムスタンプ以降に発生した変更の監視を開始できます。これらの時間ベースの機能は、履歴データ処理、point-in-timeリカバリ、またはシステム間の同期を必要とするシナリオで特に役立ちます。

次のコード例は、挿入ドキュメントに関連付けられたイベントを取得し、再開トークンをキャプチャしてから、挿入イベントの後にイベントのモニタリングを開始するトークンを提供します。イベントは更新イベントに関連付けられ、更新が発生したクラスター時間を取得し、そのタイムスタンプをさらなる処理の開始点として使用します。

BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }

デフォルトでは、更新変更イベントには完全なドキュメントが含まれず、加えられた変更のみが含まれます。更新した完全なドキュメントにアクセスする必要がある場合は、 ChangeStreamIterable オブジェクトで fullDocument()メソッドを呼び出すことができます。更新イベントに対して完全なドキュメントが返されるように要求すると、ストリームを変更する呼び出しが行われた時点で存在するドキュメントが返されることに注意してください。

このメソッドは、FullDocument列挙型をパラメータとして受け取ります。現在、Amazon DocumentDB は DEFAULT および UPDATE_LOOKUP値のみをサポートしています。次のコードスニペットは、変更の監視を開始するときに、更新イベントの完全なドキュメントを要求する方法を示しています。

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())