Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Programmazione basata sugli eventi con Amazon DocumentDB e Java
La programmazione basata sugli eventi nel contesto di Amazon DocumentDB rappresenta un potente modello architettonico in cui le modifiche al database fungono da generatori di eventi principali che attivano la logica e i processi aziendali successivi. Quando i record vengono inseriti, aggiornati o eliminati in una raccolta DocumentDB, queste modifiche agiscono come eventi che avviano automaticamente vari processi, notifiche o attività di sincronizzazione dei dati a valle. Questo modello è particolarmente utile nei moderni sistemi distribuiti in cui più applicazioni o servizi devono reagire alle modifiche dei dati in tempo reale. Il meccanismo principale di implementazione della programmazione basata sugli eventi in DocumentDB è costituito dai flussi di modifiche.
Nota
Questa guida presuppone che tu abbia abilitato i flussi di modifica su una raccolta con cui stai lavorando. Scopri come abilitare i flussi di modifica nella raccolta. Utilizzo dei flussi di modifica con Amazon DocumentDB
Utilizzo dei flussi di modifica dall'applicazione Java
Il watch()
metodo del driver Java di MongoDB è il meccanismo principale per monitorare le modifiche dei dati in tempo reale in Amazon DocumentDB. Il watch()
metodo può essere richiamato da MongoClient
MongoDatabase
MongoCollection
Il watch()
metodo restituisce un'istanza ChangeStreamIterable
ChangeStreamIterable
Iterable
forEach()
Per acquisire gli eventi utilizzandoforEach()
, passa una funzione di callback forEach()
che elabora l'evento modificato. Il seguente frammento di codice mostra come aprire un flusso di modifiche su una raccolta per avviare il monitoraggio degli eventi di modifica:
ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });
Un altro modo per attraversare tutti gli eventi di modifica consiste nell'aprire un cursore che mantiene una connessione al cluster e riceve continuamente nuovi eventi di modifica man mano che si verificano. Per ottenere un cursore Change Streams, utilizzate il metodo dell'oggetto. cursor()
ChangeStreamIterable
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }
Come procedura ottimale, create il comando MongoChangeStreamCursor
cursor()
metodo on ChangeStreamIterable
MongoChangeStreamCursor
restituisce un messaggio creato su un ChangeStreamDocument
La ChangeStreamDocument
getOperationType()
ad esempio per determinare il tipo di modifica, accedere getFullDocument()
allo stato completo del documento e getDocumentKey()
identificare il documento modificato.
L'ChangeStreamDocument
I token di ripresa e le operazioni basate sul tempo nei flussi di modifica di DocumentDB forniscono meccanismi cruciali per mantenere la continuità e gestire l'accesso ai dati storici. Un token di ripristino è un identificatore univoco generato per ogni evento di modifica, che funge da segnalibro che consente alle applicazioni di riavviare l'elaborazione del flusso di modifiche da un punto specifico dopo disconnessioni o errori. Quando viene creato un cursore del flusso di modifica, può utilizzare un token di ripristino precedentemente memorizzato tramite resumeAfter()
l'opzione, che consente allo stream di continuare da dove era stato interrotto anziché ricominciare dall'inizio o perdere gli eventi.
Le operazioni basate sul tempo nei flussi di modifica offrono approcci diversi per gestire il punto di partenza del monitoraggio degli eventi di modifica. L'startAtOperationTime()
opzione consente di iniziare a guardare le modifiche avvenute in corrispondenza o dopo un determinato timestamp. Queste funzionalità basate sul tempo sono particolarmente utili in scenari che richiedono l'elaborazione, il point-in-time ripristino o la sincronizzazione dei dati storici tra i sistemi.
Il seguente esempio di codice recupera l'evento associato al documento di inserimento, ne acquisisce il token di ripristino e quindi lo fornisce per avviare il monitoraggio degli eventi successivi all'evento di inserimento. L'evento viene associato all'evento di aggiornamento, quindi ottiene l'ora del cluster in cui è avvenuto l'aggiornamento e utilizza tale timestamp come punto di partenza per l'ulteriore elaborazione.
BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }
Per impostazione predefinita, l'evento di modifica dell'aggiornamento non include il documento completo e include solo le modifiche apportate. Se è necessario accedere al documento completo che è stato aggiornato, è possibile chiamare il fullDocument()
metodo sull'ChangeStreamIterable
Questo metodo accetta un FullDocument
UPDATE_LOOKUP
valori. Il seguente frammento di codice mostra come richiedere il documento completo per gli eventi di aggiornamento quando si inizia a controllare le modifiche:
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())