Transactions - Amazon DocumentDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Transactions

Amazon DocumentDB (compatible avec MongoDB) prend désormais en charge la compatibilité avec MongoDB 4.0, y compris les transactions. Vous pouvez effectuer des transactions sur plusieurs documents, relevés, collections et bases de données. Les transactions simplifient le développement d'applications en vous permettant d'effectuer des opérations atomiques, cohérentes, isolées et durables (ACID) sur un ou plusieurs documents au sein d'un cluster Amazon DocumentDB. Les cas d'utilisation courants des transactions incluent le traitement financier, l'exécution et la gestion des commandes et la création de jeux multijoueurs.

Il n'y a aucun coût supplémentaire pour les transactions. Vous ne payez que pour les iOS en lecture et en écriture que vous consommez dans le cadre des transactions.

Prérequis

Pour utiliser la fonction de transactions, vous devez répondre aux exigences suivantes :

  • Vous devez utiliser le moteur Amazon DocumentDB 4.0.

  • Vous devez utiliser un pilote compatible avec MongoDB 4.0 ou supérieur.

Bonnes pratiques

Voici quelques bonnes pratiques pour tirer le meilleur parti des transactions avec Amazon DocumentDB.

  • Validez ou annulez toujours la transaction une fois qu'elle est terminée. Laisser une transaction dans un état incomplet monopolise les ressources de la base de données et peut entraîner des conflits d'écriture.

  • Il est recommandé de limiter les transactions au plus petit nombre de commandes nécessaires. Si vous avez des transactions comportant plusieurs relevés qui peuvent être divisés en plusieurs transactions plus petites, il est conseillé de le faire afin de réduire la probabilité d'un délai d'attente. Essayez toujours de créer des transactions courtes, et non des lectures de longue durée.

Limites

  • Amazon DocumentDB ne prend pas en charge les curseurs dans une transaction.

  • Amazon DocumentDB ne peut pas créer de nouvelles collections dans une transaction et ne peut pas interroger/mettre à jour des collections inexistantes.

  • Les verrous d'écriture au niveau du document sont soumis à un délai d'expiration d'une minute, qui n'est pas configurable par l'utilisateur.

  • Les commandes d'écriture réessayable, de validation réessayable et d'abandon réessayable ne sont pas prises en charge dans Amazon DocumentDB. Exception : Si vous utilisez mongo shell, n'incluez laretryWrites=false commande dans aucune chaîne de code. Par défaut, les écritures réessayables sont désactivées. L'inclusionretryWrites=false peut entraîner l'échec des commandes de lecture normales.

  • Chaque instance Amazon DocumentDB possède une limite supérieure quant au nombre de transactions simultanées ouvertes simultanément sur l'instance. Pour les limites, veuillez consulterLimites d’instance.

  • Pour une transaction donnée, la taille du journal des transactions doit être inférieure à 32 Mo.

  • Amazon DocumentDB prend en chargecount() les transactions au sein d'une transaction, mais tous les pilotes ne prennent pas en charge cette fonctionnalité. Une alternative consiste à utiliser l'countDocuments()API, qui traduit la requête de comptage en une requête d'agrégation côté client.

  • Les transactions ont une limite d'exécution d'une minute et les sessions ont un délai d'expiration de 30 minutes. Si une transaction expire, elle sera abandonnée et toutes les commandes ultérieures émises au cours de la session pour la transaction existante produiront l'erreur suivante :

    WriteCommandError({ "ok" : 0, "operationTime" : Timestamp(1603491424, 627726), "code" : 251, "errmsg" : "Given transaction number 0 does not match any in-progress transactions." })

Surveillance et diagnostic

Avec la prise en charge des transactions dans Amazon DocumentDB 4.0, des CloudWatch indicateurs supplémentaires ont été ajoutés pour vous aider à surveiller vos transactions.

Nouveaux CloudWatch indicateurs

  • DatabaseTransactions: le nombre de transactions ouvertes effectuées sur une période d'une minute.

  • DatabaseTransactionsAborted: le nombre de transactions abandonnées effectuées par période d'une minute.

  • DatabaseTransactionsMax: le nombre maximum de transactions ouvertes sur une période d'une minute.

  • TransactionsAborted: le nombre de transactions abandonnées sur une instance sur une période d'une minute.

  • TransactionsCommitted: le nombre de transactions effectuées sur une instance sur une période d'une minute.

  • TransactionsOpen: le nombre de transactions ouvertes sur une instance prise par période d'une minute.

  • TransactionsOpenMax: le nombre maximum de transactions ouvertes sur une instance par période d'une minute.

  • TransactionsStarted: le nombre de transactions démarrées sur une instance sur une période d'une minute.

Note

Pour en savoir plus sur les CloudWatch statistiques relatives à Amazon DocumentDB, rendez-vous surSurveillance d'Amazon DocumentDB avec CloudWatch.

De plus, de nouveaux champs ont été ajoutés aux deuxcurrentOplsidtransactionThreadId, et un nouvel état pour «idle transaction » etserverStatus les transactions :currentActivecurrentInactivecurrentOpen,totalAborted,totalCommitted, ettotalStarted.

Niveau d'isolement des transactions

Lorsque vous démarrez une transaction, il est possible de spécifier à la fois lereadConcern etwriteConcern comme indiqué dans l'exemple ci-dessous :

mySession.startTransaction({readConcern: {level: 'snapshot'}, writeConcern: {w: 'majority'}});

En effetreadConcern, Amazon DocumentDB prend en charge l'isolation des instantanés par défaut. Si unereadConcern valeur locale, disponible ou majoritaire est spécifiée, Amazon DocumentDB passera aureadConcern niveau instantané. Amazon DocumentDB ne prend pas en charge le linéarisablereadConcern et la spécification d'un tel problème de lecture entraînera une erreur.

En effetwriteConcern, Amazon DocumentDB prend en charge la majorité par défaut et un quorum d'écriture est atteint lorsque quatre copies des données sont conservées sur trois AZ. Si une valeur inférieurewriteConcern est spécifiée, Amazon DocumentDB passewriteConcern à la majorité. En outre, toutes les écritures Amazon DocumentDB sont journalisées et la journalisation ne peut pas être désactivée.

Cas d'utilisation

Dans cette section, nous passerons en revue deux cas d'utilisation des transactions : plusieurs relevés et plusieurs collectes.

Transactions comportant plusieurs états

Les transactions Amazon DocumentDB comportent plusieurs déclarations, ce qui signifie que vous pouvez écrire une transaction qui couvre plusieurs instructions avec un commit ou un rollback explicite. Vous pouvez regrouperinsert,updatedelete, et desfindAndModify actions en une seule opération atomique.

Un cas d'utilisation courant pour les transactions comportant plusieurs états est celui d'une transaction débit/crédit. Par exemple : vous devez de l'argent à un ami pour des vêtements. Ainsi, vous devez débiter (retirer) 500$ de votre compte et créditer 500$ (dépôt) sur le compte de votre ami. Pour effectuer cette opération, vous effectuez à la fois les opérations de dette et de crédit dans le cadre d'une seule transaction afin de garantir l'atomicité. Cela permet d'éviter les scénarios où 500$ sont débités de votre compte, mais pas crédités sur le compte de votre ami. Voici à quoi ressemblerait ce cas d'utilisation :

// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; // add $500 to Bob's account var bobBalance = accountColl.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountColl.find({"name": "Bob"}).next().balance; session.commitTransaction(); accountColl.find(); // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; session.abortTransaction();

Transactions à collectes multiples

Nos transactions sont également multi-collections, ce qui signifie qu'elles peuvent être utilisées pour effectuer plusieurs opérations au sein d'une même transaction et sur plusieurs collections. Cela fournit une vue cohérente des données et préserve l'intégrité de vos données. Lorsque vous validez les commandes en une seule<>, les transactions sont all-or-nothing des exécutions, c'est-à-dire qu'elles réussiront toutes ou échoueront toutes.

Voici un exemple de transactions à encaissements multiples, utilisant le même scénario et les mêmes données que l'exemple pour les transactions à relevés multiples.

// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var amountToTransfer = 500; var collectionName = "account"; var session = db.getMongo().startSession({causalConsistency: false}); var accountCollInBankA = session.getDatabase("bankA")[collectionName]; var accountCollInBankB = session.getDatabase("bankB")[collectionName]; accountCollInBankA.drop(); accountCollInBankB.drop(); accountCollInBankA.insert({name: "Alice", balance: 1000}); accountCollInBankB.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; // add $500 to Bob's account var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; session.commitTransaction(); accountCollInBankA.find(); // Alice holds $500 in bankA accountCollInBankB.find(); // Bob holds $1500 in bankB // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var accountCollInBankA = session.getDatabase("bankA")[collectionName]; var accountCollInBankB = session.getDatabase("bankB")[collectionName]; accountCollInBankA.drop(); accountCollInBankB.drop(); accountCollInBankA.insert({name: "Alice", balance: 1000}); accountCollInBankB.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; // add $500 to Bob's account var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; session.abortTransaction(); accountCollInBankA.find(); // Alice holds $1000 in bankA accountCollInBankB.find(); // Bob holds $1000 in bankB

Exemples d'API de transaction pour l'API de rappel

L'API de rappel n'est disponible que pour les pilotes 4.2+.

Javascript

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Javascript.

// *** Transfer $500 from Alice to Bob inside a transaction: Success *** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert.eq(newAliceBalance, findAliceBalance); // add $500 to Bob's account var bobBalance = accountColl.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountColl.find({"name": "Bob"}).next().balance; assert.eq(newBobBalance, findBobBalance); session.commitTransaction(); accountColl.find();
Node.js

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Node.js.

// Node.js callback API: const bankDB = await mongoclient.db("bank"); var accountColl = await bankDB.createCollection("account"); var amountToTransfer = 500; const session = mongoclient.startSession({causalConsistency: false}); await accountColl.drop(); await accountColl.insertOne({name: "Alice", balance: 1000}, { session }); await accountColl.insertOne({name: "Bob", balance: 1000}, { session }); const transactionOptions = { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } }; // deduct $500 from Alice's account var aliceBalance = await accountColl.findOne({name: "Alice"}, {session}); assert(aliceBalance.balance >= amountToTransfer); var newAliceBalance = aliceBalance - amountToTransfer; session.startTransaction(transactionOptions); await accountColl.updateOne({name: "Alice"}, {$set: {balance: newAliceBalance}}, {session }); await session.commitTransaction(); aliceBalance = await accountColl.findOne({name: "Alice"}, {session}); assert(newAliceBalance == aliceBalance.balance); // add $500 to Bob's account var bobBalance = await accountColl.findOne({name: "Bob"}, {session}); var newBobBalance = bobBalance.balance + amountToTransfer; session.startTransaction(transactionOptions); await accountColl.updateOne({name: "Bob"}, {$set: {balance: newBobBalance}}, {session }); await session.commitTransaction(); bobBalance = await accountColl.findOne({name: "Bob"}, {session}); assert(newBobBalance == bobBalance.balance);
C#

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec C#.

// C# Callback API var dbName = "bank"; var collName = "account"; var amountToTransfer = 500; using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false})) { var bankDB = client.GetDatabase(dbName); var accountColl = bankDB.GetCollection<BsonDocument>(collName); bankDB.DropCollection(collName); accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } }); accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } }); // start transaction var transactionOptions = new TransactionOptions( readConcern: ReadConcern.Snapshot, writeConcern: WriteConcern.WMajority); var result = session.WithTransaction( (sess, cancellationtoken) => { // deduct $500 from Alice's account var aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer; accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice"), Builders<BsonDocument>.Update.Set("balance", newAliceBalance)); aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance == newAliceBalance); // add $500 from Bob's account var bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); var newBobBalance = bobBalance.AsInt32 + amountToTransfer; accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob"), Builders<BsonDocument>.Update.Set("balance", newBobBalance)); bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(bobBalance == newBobBalance); return "Transaction committed"; }, transactionOptions); // check values outside of transaction var aliceNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); var bobNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceNewBalance == 500); Debug.Assert(bobNewBalance == 1500); }
Ruby

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Ruby.

// Ruby Callback API dbName = "bank" collName = "account" amountToTransfer = 500 session = client.start_session(:causal_consistency=> false) bankDB = Mongo::Database.new(client, dbName) accountColl = bankDB[collName] accountColl.drop() accountColl.insert_one({"name"=>"Alice", "balance"=>1000}) accountColl.insert_one({"name"=>"Bob", "balance"=>1000}) # start transaction session.with_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) do # deduct $500 from Alice's account aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance'] assert aliceBalance >= amountToTransfer newAliceBalance = aliceBalance - amountToTransfer accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session) aliceBalance = accountColl.find({"name"=>>"Alice"}, :session=> session).first['balance'] assert_equal(newAliceBalance, aliceBalance) # add $500 from Bob's account bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] newBobBalance = bobBalance + amountToTransfer accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session) bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] assert_equal(newBobBalance, bobBalance) end # check results outside of transaction aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance'] bobBalance = accountColl.find({"name"=>"Bob"}).first['balance'] assert_equal(aliceBalance, 500) assert_equal(bobBalance, 1500) session.end_session
Go

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Go.

// Go - Callback API type Account struct { Name string Balance int } ctx := context.TODO() dbName := "bank" collName := "account" amountToTransfer := 500 session, err := client.StartSession(options.Session().SetCausalConsistency(false)) assert.NilError(t, err) defer session.EndSession(ctx) bankDB := client.Database(dbName) accountColl := bankDB.Collection(collName) accountColl.Drop(ctx) _, err = accountColl.InsertOne(ctx, bson.M{"name" : "Alice", "balance":1000}) _, err = accountColl.InsertOne(ctx, bson.M{"name" : "Bob", "balance":1000}) transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.New(writeconcern.WMajority())) _, err = session.WithTransaction(ctx, func(sessionCtx mongo.SessionContext) (interface{}, error) { var result Account // deduct $500 from Alice's account err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result) aliceBalance := result.Balance newAliceBalance := aliceBalance - amountToTransfer _, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}}) err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result) aliceBalance = result.Balance assert.Equal(t, aliceBalance, newAliceBalance) // add $500 to Bob's account err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result) bobBalance := result.Balance newBobBalance := bobBalance + amountToTransfer _, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}}) err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result) bobBalance = result.Balance assert.Equal(t, bobBalance, newBobBalance) if err != nil { return nil, err } return "transaction committed", err }, transactionOptions) // check results outside of transaction var result Account err = accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result) aliceNewBalance := result.Balance err = accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result) bobNewBalance := result.Balance assert.Equal(t, aliceNewBalance, 500) assert.Equal(t, bobNewBalance, 1500) // Go - Core API type Account struct { Name string Balance int } func transferMoneyWithRetry(sessionContext mongo.SessionContext, accountColl *mongo.Collection, t *testing.T) error { amountToTransfer := 500 transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.New(writeconcern.WMajority())) if err := sessionContext.StartTransaction(transactionOptions); err != nil { panic(err) } var result Account // deduct $500 from Alice's account err := accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result) aliceBalance := result.Balance newAliceBalance := aliceBalance - amountToTransfer _, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}}) if err != nil { sessionContext.AbortTransaction(sessionContext) } err = accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result) aliceBalance = result.Balance assert.Equal(t, aliceBalance, newAliceBalance) // add $500 to Bob's account err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result) bobBalance := result.Balance newBobBalance := bobBalance + amountToTransfer _, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}}) if err != nil { sessionContext.AbortTransaction(sessionContext) } err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result) bobBalance = result.Balance assert.Equal(t, bobBalance, newBobBalance) err = sessionContext.CommitTransaction(sessionContext) return err } func doTransactionWithRetry(t *testing.T) { ctx := context.TODO() dbName := "bank" collName := "account" bankDB := client.Database(dbName) accountColl := bankDB.Collection(collName) client.UseSessionWithOptions(ctx, options.Session().SetCausalConsistency(false), func(sessionContext mongo.SessionContext) error { accountColl.Drop(ctx) accountColl.InsertOne(sessionContext, bson.M{"name" : "Alice", "balance":1000}) accountColl.InsertOne(sessionContext, bson.M{"name" : "Bob", "balance":1000}) for { err := transferMoneyWithRetry(sessionContext, accountColl, t) if err == nil { println("transaction committed") return nil } if mongoErr := err.(mongo.CommandError); mongoErr.HasErrorLabel("TransientTransactionError") { continue } println("transaction failed") return err } }) // check results outside of transaction var result Account accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&esult) aliceBalance := result.Balance assert.Equal(t, aliceBalance, 500) accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result) bobBalance := result.Balance assert.Equal(t, bobBalance, 1500) }
Java

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Java.

// Java (sync) - Callback API MongoDatabase bankDB = mongoClient.getDatabase("bank"); MongoCollection accountColl = bankDB.getCollection("account"); accountColl.drop(); int amountToTransfer = 500; // add sample data accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)); accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)); TransactionOptions txnOptions = TransactionOptions.builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build(); ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build(); try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) { clientSession.withTransaction(new TransactionBody<Void>() { @Override public Void execute() { // deduct $500 from Alice's account List<Document> documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int aliceBalance = (int) documentList.get(0).get("balance"); int newAliceBalance = aliceBalance - amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))); // check Alice's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newAliceBalance); // add $500 to Bob's account documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); int bobBalance = (int) documentList.get(0).get("balance"); int newBobBalance = bobBalance + amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))); // check Bob's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newBobBalance); return null; } }, txnOptions); }
C

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec C.

// Sample Code for C with Callback #include <bson.h> #include <mongoc.h> #include <stdio.h> #include <string.h> #include <assert.h> typedef struct { int64_t balance; bson_t *account; bson_t *opts; mongoc_collection_t *collection; } ctx_t; bool callback_session (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error) { bool r = true; ctx_t *data = (ctx_t *) ctx; bson_t local_reply; bson_t *selector = data->account; bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (data->balance), "}"); mongoc_collection_update_one (data->collection, selector, update, data->opts, &local_reply, error); *reply = bson_copy (&local_reply); bson_destroy (&local_reply); bson_destroy (update); return r; } void test_callback_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){ bson_t reply; bool r = true; const bson_t *doc; bson_iter_t iter; ctx_t alice_ctx; ctx_t bob_ctx; bson_error_t error; // find query bson_t *alice_query = bson_new (); BSON_APPEND_UTF8(alice_query, "name", "Alice"); bson_t *bob_query = bson_new (); BSON_APPEND_UTF8(bob_query, "name", "Bob"); // create session // set causal consistency to false mongoc_session_opt_t *session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_causal_consistency (session_opts, false); // start the session mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error); // add session to options bson_t *opts = bson_new(); mongoc_client_session_append (client_session, opts, &error); // deduct 500 from Alice // find account balance of Alice mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance >= amount_to_transfer); int64_t new_alice_balance = alice_balance - amount_to_transfer; // set variables which will be used by callback function alice_ctx.collection = collection; alice_ctx.opts = opts; alice_ctx.balance = new_alice_balance; alice_ctx.account = alice_query; // callback r = mongoc_client_session_with_transaction (client_session, &callback_session, NULL, &alice_ctx, &reply, &error); assert(r); // find account balance of Alice after transaction cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance == new_alice_balance); assert(alice_balance == 500); // add 500 to bob's balance // find account balance of Bob cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64; int64_t new_bob_balance = bob_balance + amount_to_transfer; bob_ctx.collection = collection; bob_ctx.opts = opts; bob_ctx.balance = new_bob_balance; bob_ctx.account = bob_query; // set read & write concern mongoc_read_concern_t *read_concern = mongoc_read_concern_new (); mongoc_write_concern_t *write_concern = mongoc_write_concern_new (); mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new (); mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY); mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT); mongoc_transaction_opts_set_write_concern (txn_opts, write_concern); mongoc_transaction_opts_set_read_concern (txn_opts, read_concern); // callback r = mongoc_client_session_with_transaction (client_session, &callback_session, txn_opts, &bob_ctx, &reply, &error); assert(r); // find account balance of Bob after transaction cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); bob_balance = (bson_iter_value (&iter))->value.v_int64; assert(bob_balance == new_bob_balance); assert(bob_balance == 1500); // cleanup bson_destroy(alice_query); bson_destroy(bob_query); mongoc_client_session_destroy(client_session); bson_destroy(opts); mongoc_transaction_opts_destroy(txn_opts); mongoc_read_concern_destroy(read_concern); mongoc_write_concern_destroy(write_concern); mongoc_cursor_destroy(cursor); bson_destroy(doc); } int main(int argc, char* argv[]) { mongoc_init (); mongoc_client_t* client = mongoc_client_new (<connection uri>); bson_error_t error; // connect to bank db mongoc_database_t *database = mongoc_client_get_database (client, "bank"); // access account collection mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account"); // set amount to transfer int64_t amount_to_transfer = 500; // delete the collection if already existing mongoc_collection_drop(collection, &error); // open Alice account bson_t *alice_account = bson_new (); BSON_APPEND_UTF8(alice_account, "name", "Alice"); BSON_APPEND_INT64(alice_account, "balance", 1000); // open Bob account bson_t *bob_account = bson_new (); BSON_APPEND_UTF8(bob_account, "name", "Bob"); BSON_APPEND_INT64(bob_account, "balance", 1000); bool r = true; r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} test_callback_money_transfer(client, collection, amount_to_transfer); }
Python

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Python.

// Sample Python code with callback api import pymongo def callback(session, balance, query): collection.update_one(query, {'$set': {"balance": balance}}, session=session) client = pymongo.MongoClient(<connection uri>) rc_snapshot = pymongo.read_concern.ReadConcern('snapshot') wc_majority = pymongo.write_concern.WriteConcern('majority') # To start, drop and create an account collection and insert balances for both Alice and Bob collection = client.get_database("bank").get_collection("account") collection.drop() collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000}) collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000}) amount_to_transfer = 500 # deduct 500 from Alice's account alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert alice_balance >= amount_to_transfer new_alice_balance = alice_balance - amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.with_transaction(lambda s: callback(s, new_alice_balance, {"name": "Alice"}), read_concern=rc_snapshot, write_concern=wc_majority) updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert updated_alice_balance == new_alice_balance # add 500 to Bob's account bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert bob_balance >= amount_to_transfer new_bob_balance = bob_balance + amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.with_transaction(lambda s: callback(s, new_bob_balance, {"name": "Bob"}), read_concern=rc_snapshot, write_concern=wc_majority) updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert updated_bob_balance == new_bob_balance Sample Python code with Core api import pymongo client = pymongo.MongoClient(<connection_string>) rc_snapshot = pymongo.read_concern.ReadConcern('snapshot') wc_majority = pymongo.write_concern.WriteConcern('majority') # To start, drop and create an account collection and insert balances for both Alice and Bob collection = client.get_database("bank").get_collection("account") collection.drop() collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000}) collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000}) amount_to_transfer = 500 # deduct 500 from Alice's account alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert alice_balance >= amount_to_transfer new_alice_balance = alice_balance - amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority) collection.update_one({"name": "Alice"}, {'$set': {"balance": new_alice_balance}}, session=session) session.commit_transaction() updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert updated_alice_balance == new_alice_balance # add 500 to Bob's account bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert bob_balance >= amount_to_transfer new_bob_balance = bob_balance + amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority) collection.update_one({"name": "Bob"}, {'$set': {"balance": new_bob_balance}}, session=session) session.commit_transaction() updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert updated_bob_balance == new_bob_balance

Exemples d'API de transaction pour l'API principale

Javascript

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Javascript.

// *** Transfer $500 from Alice to Bob inside a transaction: Success *** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert.eq(newAliceBalance, findAliceBalance); // add $500 to Bob's account var bobBalance = accountColl.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountColl.find({"name": "Bob"}).next().balance; assert.eq(newBobBalance, findBobBalance); session.commitTransaction(); accountColl.find();
C#

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec C#.

// C# Core API public void TransferMoneyWithRetry(IMongoCollection<bSondocument> accountColl, IClientSessionHandle session) { var amountToTransfer = 500; // start transaction var transactionOptions = new TransactionOptions( readConcern: ReadConcern.Snapshot, writeConcern: WriteConcern.WMajority); session.StartTransaction(transactionOptions); try { // deduct $500 from Alice's account var aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer; accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Alice"), Builders<bSondocument>.Update.Set("balance", newAliceBalance)); aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance == newAliceBalance); // add $500 from Bob's account var bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); var newBobBalance = bobBalance.AsInt32 + amountToTransfer; accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Bob"), Builders<bSondocument>.Update.Set("balance", newBobBalance)); bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(bobBalance == newBobBalance); } catch (Exception e) { session.AbortTransaction(); throw; } session.CommitTransaction(); } } public void DoTransactionWithRetry(MongoClient client) { var dbName = "bank"; var collName = "account"; using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false})) { try { var bankDB = client.GetDatabase(dbName); var accountColl = bankDB.GetCollection<bSondocument>(collName); bankDB.DropCollection(collName); accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } }); accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } }); while(true) { try { TransferMoneyWithRetry(accountColl, session); break; } catch (MongoException e) { if(e.HasErrorLabel("TransientTransactionError")) { continue; } else { throw; } } } // check values outside of transaction var aliceNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); var bobNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceNewBalance == 500); Debug.Assert(bobNewBalance == 1500); } catch (Exception e) { Console.WriteLine("Error running transaction: " + e.Message); } } }
Ruby

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Ruby.

# Ruby Core API def transfer_money_w_retry(session, accountColl) amountToTransfer = 500 session.start_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) # deduct $500 from Alice's account aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance'] assert aliceBalance >= amountToTransfer newAliceBalance = aliceBalance - amountToTransfer accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session) aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance'] assert_equal(newAliceBalance, aliceBalance) # add $500 to Bob's account bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] newBobBalance = bobBalance + amountToTransfer accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session) bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] assert_equal(newBobBalance, bobBalance) session.commit_transaction end def do_txn_w_retry(client) dbName = "bank" collName = "account" session = client.start_session(:causal_consistency=> false) bankDB = Mongo::Database.new(client, dbName) accountColl = bankDB[collName] accountColl.drop() accountColl.insert_one({"name"=>"Alice", "balance"=>1000}) accountColl.insert_one({"name"=>"Bob", "balance"=>1000}) begin transferMoneyWithRetry(session, accountColl) puts "transaction committed" rescue Mongo::Error => e if e.label?('TransientTransactionError') retry else puts "transaction failed" raise end end # check results outside of transaction aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance'] bobBalance = accountColl.find({"name"=>"Bob"}).first['balance'] assert_equal(aliceBalance, 500) assert_equal(bobBalance, 1500) end
Java

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Java.

// Java (sync) - Core API public void transferMoneyWithRetry() { // connect to server MongoClientURI mongoURI = new MongoClientURI(uri); MongoClient mongoClient = new MongoClient(mongoURI); MongoDatabase bankDB = mongoClient.getDatabase("bank"); MongoCollection accountColl = bankDB.getCollection("account"); accountColl.drop(); // insert some sample data accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)); accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)); while (true) { try { doTransferMoneyWithRetry(accountColl, mongoClient); break; } catch (MongoException e) { if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) { continue; } else { throw e; } } } } public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) { int amountToTransfer = 500; TransactionOptions txnOptions = TransactionOptions.builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build(); ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build(); try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) { clientSession.startTransaction(txnOptions); // deduct $500 from Alice's account List<Document> documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int aliceBalance = (int) documentList.get(0).get("balance"); Assert.assertTrue(aliceBalance >= amountToTransfer); int newAliceBalance = aliceBalance - amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))); // check Alice's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newAliceBalance); // add $500 to Bob's account documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); int bobBalance = (int) documentList.get(0).get("balance"); int newBobBalance = bobBalance + amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))); // check Bob's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newBobBalance); // commit transaction clientSession.commitTransaction(); } } // Java (async) -- Core API public void transferMoneyWithRetry() { // connect to the server MongoClient mongoClient = MongoClients.create(uri); MongoDatabase bankDB = mongoClient.getDatabase("bank"); MongoCollection accountColl = bankDB.getCollection("account"); SubscriberLatchWrapper<Void> dropCallback = new SubscriberLatchWrapper<>(); mongoClient.getDatabase("bank").drop().subscribe(dropCallback); dropCallback.await(); // insert some sample data SubscriberLatchWrapper<InsertOneResult> insertionCallback = new SubscriberLatchWrapper<>(); accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)).subscribe(insertionCallback); insertionCallback.await(); insertionCallback = new SubscriberLatchWrapper<>(); accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)).subscribe(insertionCallback);; insertionCallback.await(); while (true) { try { doTransferMoneyWithRetry(accountColl, mongoClient); break; } catch (MongoException e) { if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) { continue; } else { throw e; } } } } public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) { int amountToTransfer = 500; // start the transaction TransactionOptions txnOptions = TransactionOptions.builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build(); ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build(); SubscriberLatchWrapper<ClientSession> sessionCallback = new SubscriberLatchWrapper<>(); mongoClient.startSession(sessionOptions).subscribe(sessionCallback); ClientSession session = sessionCallback.get().get(0); session.startTransaction(txnOptions); // deduct $500 from Alice's account SubscriberLatchWrapper<Document> findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback); Document documentFound = findCallback.get().get(0); int aliceBalance = (int) documentFound.get("balance"); int newAliceBalance = aliceBalance - amountToTransfer; SubscriberLatchWrapper<UpdateResult> updateCallback = new SubscriberLatchWrapper<>(); accountColl.updateOne(session, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))).subscribe(updateCallback); updateCallback.await(); // check Alice's new balance findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback); documentFound = findCallback.get().get(0); int updatedBalance = (int) documentFound.get("balance"); Assert.assertEquals(updatedBalance, newAliceBalance); // add $500 to Bob's account findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback); documentFound = findCallback.get().get(0); int bobBalance = (int) documentFound.get("balance"); int newBobBalance = bobBalance + amountToTransfer; updateCallback = new SubscriberLatchWrapper<>(); accountColl.updateOne(session, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))).subscribe(updateCallback); updateCallback.await(); // check Bob's new balance findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback); documentFound = findCallback.get().get(0); updatedBalance = (int) documentFound.get("balance"); Assert.assertEquals(updatedBalance, newBobBalance); // commit the transaction SubscriberLatchWrapper<Void> transactionCallback = new SubscriberLatchWrapper<>(); session.commitTransaction().subscribe(transactionCallback); transactionCallback.await(); } public class SubscriberLatchWrapper<T> implements Subscriber<T> { /** * A Subscriber that stores the publishers results and provides a latch so can block on completion. * * @param <T> The publishers result type */ private final List<T> received; private final List<RuntimeException> errors; private final CountDownLatch latch; private volatile Subscription subscription; private volatile boolean completed; /** * Construct an instance */ public SubscriberLatchWrapper() { this.received = new ArrayList<>(); this.errors = new ArrayList<>(); this.latch = new CountDownLatch(1); } @Override public void onSubscribe(final Subscription s) { subscription = s; subscription.request(Integer.MAX_VALUE); } @Override public void onNext(final T t) { received.add(t); } @Override public void onError(final Throwable t) { if (t instanceof RuntimeException) { errors.add((RuntimeException) t); } else { errors.add(new RuntimeException("Unexpected exception", t)); } onComplete(); } @Override public void onComplete() { completed = true; subscription.cancel(); latch.countDown(); } /** * Get received elements * * @return the list of received elements */ public List<T> getReceived() { return received; } /** * Get received elements. * * @return the list of receive elements */ public List<T> get() { return await().getReceived(); } /** * Await completion or error * * @return this */ public SubscriberLatchWrapper<T> await() { subscription.request(Integer.MAX_VALUE); try { if (!latch.await(300, TimeUnit.SECONDS)) { throw new MongoTimeoutException("Publisher onComplete timed out for 300 seconds"); } } catch (InterruptedException e) { throw new MongoInterruptedException("Interrupted waiting for observeration", e); } if (!errors.isEmpty()) { throw errors.get(0); } return this; } public boolean getCompleted() { return this.completed; } public void close() { subscription.cancel(); received.clear(); } }
C

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec C.

// Sample C code with core session bool core_session(mongoc_client_session_t *client_session, mongoc_collection_t* collection, bson_t *selector, int64_t balance){ bool r = true; bson_error_t error; bson_t *opts = bson_new(); bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (balance), "}"); // set read & write concern mongoc_read_concern_t *read_concern = mongoc_read_concern_new (); mongoc_write_concern_t *write_concern = mongoc_write_concern_new (); mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new (); mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY); mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT); mongoc_transaction_opts_set_write_concern (txn_opts, write_concern); mongoc_transaction_opts_set_read_concern (txn_opts, read_concern); mongoc_client_session_start_transaction (client_session, txn_opts, &error); mongoc_client_session_append (client_session, opts, &error); r = mongoc_collection_update_one (collection, selector, update, opts, NULL, &error); mongoc_client_session_commit_transaction (client_session, NULL, &error); bson_destroy (opts); mongoc_transaction_opts_destroy(txn_opts); mongoc_read_concern_destroy(read_concern); mongoc_write_concern_destroy(write_concern); bson_destroy (update); return r; } void test_core_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){ bson_t reply; bool r = true; const bson_t *doc; bson_iter_t iter; bson_error_t error; // find query bson_t *alice_query = bson_new (); BSON_APPEND_UTF8(alice_query, "name", "Alice"); bson_t *bob_query = bson_new (); BSON_APPEND_UTF8(bob_query, "name", "Bob"); // create session // set causal consistency to false mongoc_session_opt_t *session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_causal_consistency (session_opts, false); // start the session mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error); // add session to options bson_t *opts = bson_new(); mongoc_client_session_append (client_session, opts, &error); // deduct 500 from Alice // find account balance of Alice mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance >= amount_to_transfer); int64_t new_alice_balance = alice_balance - amount_to_transfer; // core r = core_session (client_session, collection, alice_query, new_alice_balance); assert(r); // find account balance of Alice after transaction cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance == new_alice_balance); assert(alice_balance == 500); // add 500 to Bob's balance // find account balance of Bob cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64; int64_t new_bob_balance = bob_balance + amount_to_transfer; //core r = core_session (client_session, collection, bob_query, new_bob_balance); assert(r); // find account balance of Bob after transaction cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); bob_balance = (bson_iter_value (&iter))->value.v_int64; assert(bob_balance == new_bob_balance); assert(bob_balance == 1500); // cleanup bson_destroy(alice_query); bson_destroy(bob_query); mongoc_client_session_destroy(client_session); bson_destroy(opts); mongoc_cursor_destroy(cursor); bson_destroy(doc); } int main(int argc, char* argv[]) { mongoc_init (); mongoc_client_t* client = mongoc_client_new (<connection uri>); bson_error_t error; // connect to bank db mongoc_database_t *database = mongoc_client_get_database (client, "bank"); // access account collection mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account"); // set amount to transfer int64_t amount_to_transfer = 500; // delete the collection if already existing mongoc_collection_drop(collection, &error); // open Alice account bson_t *alice_account = bson_new (); BSON_APPEND_UTF8(alice_account, "name", "Alice"); BSON_APPEND_INT64(alice_account, "balance", 1000); // open Bob account bson_t *bob_account = bson_new (); BSON_APPEND_UTF8(bob_account, "name", "Bob"); BSON_APPEND_INT64(bob_account, "balance", 1000); bool r = true; r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} test_core_money_transfer(client, collection, amount_to_transfer); }
Scala

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Scala.

// Scala Core API def transferMoneyWithRetry(sessionObservable: SingleObservable[ClientSession] , database: MongoDatabase ): Unit = { val accountColl = database.getCollection("account") var amountToTransfer = 500 var transactionObservable: Observable[ClientSession] = sessionObservable.map(clientSession => { clientSession.startTransaction() // deduct $500 from Alice's account var aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance") assert(aliceBalance >= amountToTransfer) var newAliceBalance = aliceBalance - amountToTransfer accountColl.updateOne(clientSession, Document("name" -> "Alice"), Document("$set" -> Document("balance" -> newAliceBalance))).await() aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance") assert(aliceBalance == newAliceBalance) // add $500 to Bob's account var bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance") var newBobBalance = bobBalance + amountToTransfer accountColl.updateOne(clientSession, Document("name" -> "Bob"), Document("$set" -> Document("balance" -> newBobBalance))).await() bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance") assert(bobBalance == newBobBalance) clientSession }) transactionObservable.flatMap(clientSession => clientSession.commitTransaction()).await() } def doTransactionWithRetry(): Unit = { val client: MongoClient = MongoClientWrapper.getMongoClient() val database: MongoDatabase = client.getDatabase("bank") val accountColl = database.getCollection("account") accountColl.drop().await() val sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build() var sessionObservable: SingleObservable[ClientSession] = client.startSession(sessionOptions) accountColl.insertOne(Document("name" -> "Alice", "balance" -> 1000)).await() accountColl.insertOne(Document("name" -> "Bob", "balance" -> 1000)).await() var retry = true while (retry) { try { transferMoneyWithRetry(sessionObservable, database) println("transaction committed") retry = false } catch { case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => { println("retrying transaction") } case other: Throwable => { println("transaction failed") retry = false throw other } } } // check results outside of transaction assert(accountColl.find(Document("name" -> "Alice")).results().head.getInteger("balance") == 500) assert(accountColl.find(Document("name" -> "Bob")).results().head.getInteger("balance") == 1500) accountColl.drop().await() }

Commandes prises en charge

Commande Pris en charge

abortTransaction

Oui

commitTransaction

Oui

endSessions

Oui

killSession

Oui

killAllSession

Oui

killAllSessionsByPattern

Non

refreshSessions

Non

startSession

Oui

Fonctionnalités non prises en charge

Méthodes Étapes ou commandes

db.collection.aggregate()

$collStats

$currentOp

$indexStats

$listSessions

$out

db.collection.count()

db.collection.countDocuments()

$where

$near

$nearSphere

db.collection.insert()

insertn'est pas pris en charge s'il n'est pas exécuté sur une collection existante. Cette méthode est prise en charge si elle cible une collection préexistante.

Séances

Les sessions MongoDB sont un framework utilisé pour prendre en charge les écritures réessayables, la cohérence causale, les transactions et la gestion des opérations entre les bases de données. Lorsqu'une session est créée, un identifiant de session logique (lsid) est généré par le client et est utilisé pour étiqueter toutes les opérations au sein de cette session lors de l'envoi de commandes au serveur.

Amazon DocumentDB prend en charge l'utilisation de sessions pour activer les transactions, mais ne prend pas en charge la cohérence causale ni les écritures réessayables.

Lorsque vous utilisez des transactions dans Amazon DocumentDB, une transaction est initiée depuis une session à l'aide de l'session.startTransaction()API et une session prend en charge une seule transaction à la fois. De même, les transactions sont effectuées à l'aide des API commit (session.commitTransaction()) ou abort (session.abortTransaction()).

Cohérence causale

La cohérence causale garantit qu'au cours d'une seule session client, le client observera la read-after-write cohérence, les lectures/écritures monoatomiques et les écritures suivront les lectures et ces garanties s'appliquent à toutes les instances d'un cluster, et pas seulement à la principale. Amazon DocumentDB ne prend pas en charge la cohérence causale et la déclaration suivante provoquera une erreur.

var mySession = db.getMongo().startSession(); var mySessionObject = mySession.getDatabase('test').getCollection('account'); mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}}); //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 } mySessionObject.find() //Error: error: { // "ok" : 0, // "code" : 303, // "errmsg" : "Feature not supported: 'causal consistency'", // "operationTime" : Timestamp(1603461817, 493214) //} mySession.endSession()

Vous pouvez désactiver la cohérence causale au sein d'une session. Veuillez noter que cela vous permettra d'utiliser le cadre de session, mais ne fournira aucune garantie de cohérence causale pour les lectures. Lorsque vous utilisez Amazon DocumentDB, les lectures à partir de la base principale seront read-after-write cohérentes et les lectures à partir des instances de réplication finiront par être cohérentes. Les transactions constituent le principal cas d'utilisation des sessions.

var mySession = db.getMongo().startSession({causalConsistency: false}); var mySessionObject = mySession.getDatabase('test').getCollection('account'); mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}}); //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 } mySessionObject.find() //{ "_id" : 1, "name" : "Bob", "balance" : 100 } //{ "_id" : 2, "name" : "Alice", "balance" : 1700 }

écritures réessayables

Les écritures réessayables sont une fonctionnalité grâce à laquelle le client tente de réessayer les opérations d'écriture, une seule fois, lorsque des erreurs réseau se produisent ou s'il ne parvient pas à trouver l'écriture principale. Dans Amazon DocumentDB, les écritures réessayables ne sont pas prises en charge et doivent être désactivées. Vous pouvez le désactiver à l'aide de la commande (retryWrites=false) dans la chaîne de connexion.

Exception : Si vous utilisez mongo shell, n'incluez laretryWrites=false commande dans aucune chaîne de code. Par défaut, les écritures réessayables sont désactivées. L'inclusionretryWrites=false peut entraîner l'échec des commandes de lecture normales.

Erreurs de transaction

Lorsque vous utilisez des transactions, certains scénarios peuvent générer une erreur indiquant qu'un numéro de transaction ne correspond à aucune transaction en cours.

L'erreur peut être générée dans au moins deux scénarios différents :

  • After the one-minute transaction timeout.
  • After an instance restart (due to patching, crash recovery, etc.), it is possible to receive this error even in cases where the transaction successfully committed. During an instance restart, the database can't tell the difference between a transaction that successfully completed versus a transaction that aborted. In other words, the transaction completion state is ambiguous.

La meilleure façon de gérer cette erreur est de rendre les mises à jour transactionnelles idempotentes, par exemple en utilisant le$set mutateur au lieu d'une opération d'incrémentation/décrémentation. Voir ci-dessous :

{ "ok" : 0, "operationTime" : Timestamp(1603938167, 1), "code" : 251, "errmsg" : "Given transaction number 1 does not match any in-progress transactions." }