Programmazione basata sugli eventi con Amazon DocumentDB e Java - Amazon DocumentDB

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Programmazione basata sugli eventi con Amazon DocumentDB e Java

La programmazione basata sugli eventi nel contesto di Amazon DocumentDB rappresenta un potente modello architettonico in cui le modifiche al database fungono da generatori di eventi principali che attivano la logica e i processi aziendali successivi. Quando i record vengono inseriti, aggiornati o eliminati in una raccolta DocumentDB, queste modifiche agiscono come eventi che avviano automaticamente vari processi, notifiche o attività di sincronizzazione dei dati a valle. Questo modello è particolarmente utile nei moderni sistemi distribuiti in cui più applicazioni o servizi devono reagire alle modifiche dei dati in tempo reale. Il meccanismo principale di implementazione della programmazione basata sugli eventi in DocumentDB è costituito dai flussi di modifiche.

Nota

Questa guida presuppone che tu abbia abilitato i flussi di modifica su una raccolta con cui stai lavorando. Scopri come abilitare i flussi di modifica nella raccolta. Utilizzo dei flussi di modifica con Amazon DocumentDB

Utilizzo dei flussi di modifica dall'applicazione Java

Il watch() metodo del driver Java di MongoDB è il meccanismo principale per monitorare le modifiche dei dati in tempo reale in Amazon DocumentDB. Il watch() metodo può essere richiamato da MongoClientMongoDatabase, e objects. MongoCollection

Il watch() metodo restituisce un'istanza ChangeStreamIterableche supporta varie opzioni di configurazione, tra cui la ricerca completa dei documenti per gli aggiornamenti, la fornitura di token di curriculum e timestamp per l'affidabilità e le fasi di aggregazione della pipeline per filtrare le modifiche.

ChangeStreamIterableimplementa l'interfaccia Java di base e può essere utilizzato con. Iterable forEach() Per acquisire gli eventi utilizzandoforEach(), passa una funzione di callback forEach() che elabora l'evento modificato. Il seguente frammento di codice mostra come aprire un flusso di modifiche su una raccolta per avviare il monitoraggio degli eventi di modifica:

ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });

Un altro modo per attraversare tutti gli eventi di modifica consiste nell'aprire un cursore che mantiene una connessione al cluster e riceve continuamente nuovi eventi di modifica man mano che si verificano. Per ottenere un cursore Change Streams, utilizzate il metodo dell'oggetto. cursor() ChangeStreamIterable Il seguente esempio di codice mostra come monitorare gli eventi di modifica utilizzando il cursore:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }

Come procedura ottimale, create il comando MongoChangeStreamCursorin un' try-with-resourceistruzione o chiudete manualmente il cursore. La chiamata al cursor() metodo on ChangeStreamIterableMongoChangeStreamCursorrestituisce un messaggio creato su un ChangeStreamDocumentoggetto.

La ChangeStreamDocumentclasse è un componente cruciale che rappresenta i singoli eventi di modifica nello stream. Contiene informazioni dettagliate su ogni modifica, incluso il tipo di operazione (inserimento, aggiornamento, eliminazione, sostituzione), la chiave del documento, le informazioni sullo spazio dei nomi e il contenuto completo del documento, se disponibile. La classe fornisce metodi per accedere a vari aspetti dell'evento di modifica, getOperationType() ad esempio per determinare il tipo di modifica, accedere getFullDocument() allo stato completo del documento e getDocumentKey() identificare il documento modificato.

L'ChangeStreamDocumentoggetto fornisce due informazioni importanti, un token di ripristino e l'ora dell'evento di modifica.

I token di ripresa e le operazioni basate sul tempo nei flussi di modifica di DocumentDB forniscono meccanismi cruciali per mantenere la continuità e gestire l'accesso ai dati storici. Un token di ripristino è un identificatore univoco generato per ogni evento di modifica, che funge da segnalibro che consente alle applicazioni di riavviare l'elaborazione del flusso di modifiche da un punto specifico dopo disconnessioni o errori. Quando viene creato un cursore del flusso di modifica, può utilizzare un token di ripristino precedentemente memorizzato tramite resumeAfter() l'opzione, che consente allo stream di continuare da dove era stato interrotto anziché ricominciare dall'inizio o perdere gli eventi.

Le operazioni basate sul tempo nei flussi di modifica offrono approcci diversi per gestire il punto di partenza del monitoraggio degli eventi di modifica. L'startAtOperationTime()opzione consente di iniziare a guardare le modifiche avvenute in corrispondenza o dopo un determinato timestamp. Queste funzionalità basate sul tempo sono particolarmente utili in scenari che richiedono l'elaborazione, il point-in-time ripristino o la sincronizzazione dei dati storici tra i sistemi.

Il seguente esempio di codice recupera l'evento associato al documento di inserimento, ne acquisisce il token di ripristino e quindi lo fornisce per avviare il monitoraggio degli eventi successivi all'evento di inserimento. L'evento viene associato all'evento di aggiornamento, quindi ottiene l'ora del cluster in cui è avvenuto l'aggiornamento e utilizza tale timestamp come punto di partenza per l'ulteriore elaborazione.

BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }

Per impostazione predefinita, l'evento di modifica dell'aggiornamento non include il documento completo e include solo le modifiche apportate. Se è necessario accedere al documento completo che è stato aggiornato, è possibile chiamare il fullDocument() metodo sull'ChangeStreamIterableoggetto. Tenete presente che quando richiedete la restituzione di un documento completo per un evento di aggiornamento, viene restituito il documento esistente al momento della chiamata per modificare i flussi.

Questo metodo accetta un FullDocumentenum come parametro. Attualmente, Amazon DocumentDB supporta solo DEFAULT e UPDATE_LOOKUP valori. Il seguente frammento di codice mostra come richiedere il documento completo per gli eventi di aggiornamento quando si inizia a controllare le modifiche:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())