Transactions dans Amazon DocumentDB - Amazon DocumentDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Transactions dans Amazon DocumentDB

Amazon DocumentDB (compatible avec MongoDB) prend désormais en charge la compatibilité avec MongoDB 4.0, y compris les transactions. Vous pouvez effectuer des transactions sur plusieurs documents, relevés, collections et bases de données. Les transactions simplifient le développement d'applications en vous permettant d'effectuer des opérations atomiques, cohérentes, isolées et durables (ACID) sur un ou plusieurs documents d'un cluster Amazon DocumentDB. Les cas d'utilisation courants des transactions incluent le traitement financier, l'exécution et la gestion des commandes, ainsi que la création de jeux multijoueurs.

Il n'y a aucun coût supplémentaire pour les transactions. Vous ne payez que pour la lecture et l'écriture IOs que vous consommez dans le cadre des transactions.

Prérequis

Pour utiliser la fonctionnalité de transactions, vous devez satisfaire aux exigences suivantes :

  • Vous devez utiliser le moteur Amazon DocumentDB 4.0.

  • Vous devez utiliser un pilote compatible avec MongoDB 4.0 ou version ultérieure.

Bonnes pratiques

Voici quelques bonnes pratiques qui vous permettront de tirer le meilleur parti des transactions avec Amazon DocumentDB.

  • Validez ou annulez toujours la transaction une fois qu'elle est terminée. Le fait de laisser une transaction dans un état incomplet sollicite les ressources de la base de données et peut provoquer des conflits d'écriture.

  • Il est recommandé de limiter les transactions au plus petit nombre de commandes nécessaires. Si vous avez des transactions comportant plusieurs relevés qui peuvent être divisés en plusieurs transactions plus petites, il est conseillé de le faire afin de réduire le risque d'un délai d'attente. Essayez toujours de créer des transactions courtes, et non des lectures de longue durée.

Limites

  • Amazon DocumentDB ne prend pas en charge les curseurs dans une transaction.

  • Amazon DocumentDB ne peut pas créer de nouvelles collections dans une transaction et ne peut pas interroger/mettre à jour des collections non existantes.

  • Les blocages d'écriture au niveau du document sont soumis à un délai d'une minute, qui n'est pas configurable par l'utilisateur.

  • Les commandes d'écriture réessayable, de validation réessayable et d'abandon réessayable ne sont pas prises en charge dans Amazon DocumentDB. Si vous utilisez un ancien shell mongo (et non mongosh), n'incluez la retryWrites=false commande dans aucune chaîne de code. Par défaut, les écritures réessayables sont désactivées. L'inclusion retryWrites=false peut entraîner un échec des commandes de lecture normales.

  • Chaque instance Amazon DocumentDB a une limite supérieure quant au nombre de transactions simultanées ouvertes simultanément sur l'instance. Pour les limites, veuillez consulterLimites d’instance.

  • Pour une transaction donnée, la taille du journal des transactions doit être inférieure à 32 Mo.

  • Amazon DocumentDB prend en charge les transactions count() au sein d'une même transaction, mais tous les pilotes ne prennent pas en charge cette fonctionnalité. Une alternative consiste à utiliser l'countDocuments()API, qui traduit la requête de comptage en une requête d'agrégation côté client.

  • Les transactions ont une limite d'exécution d'une minute et les sessions ont un délai d'expiration de 30 minutes. Si le délai d'expiration d'une transaction est dépassé, elle sera abandonnée et toutes les commandes suivantes émises au cours de la session pour la transaction existante produiront l'erreur suivante :

    WriteCommandError({ "ok" : 0, "operationTime" : Timestamp(1603491424, 627726), "code" : 251, "errmsg" : "Given transaction number 0 does not match any in-progress transactions." }

Surveillance et diagnostic

Grâce à la prise en charge des transactions dans Amazon DocumentDB 4.0, des CloudWatch métriques supplémentaires ont été ajoutées pour vous aider à surveiller vos transactions.

Nouvelles CloudWatch métriques

  • DatabaseTransactions: Le nombre de transactions ouvertes effectuées sur une période d'une minute.

  • DatabaseTransactionsAborted: le nombre de transactions annulées effectuées sur une période d'une minute.

  • DatabaseTransactionsMax: Le nombre maximum de transactions ouvertes sur une période d'une minute.

  • TransactionsAborted: le nombre de transactions abandonnées sur une instance au cours d'une période d'une minute.

  • TransactionsCommitted: le nombre de transactions validées sur une instance au cours d'une période d'une minute.

  • TransactionsOpen: le nombre de transactions ouvertes sur une instance prise sur une période d'une minute.

  • TransactionsOpenMax: Le nombre maximum de transactions ouvertes sur une instance sur une période d'une minute.

  • TransactionsStarted: le nombre de transactions démarrées sur une instance au cours d'une période d'une minute.

Note

Pour plus d' CloudWatch informations sur Amazon DocumentDB, rendez-vous sur. Surveillance d'Amazon DocumentDB avec CloudWatch

De plus, de nouveaux champs ont été ajoutés aux deux currentOp lsidtransactionThreadId, et un nouvel état pour « idle transaction » et serverStatus les transactions : currentActivecurrentInactive,currentOpen,totalAborted,totalCommitted, ettotalStarted.

Niveau d'isolation des transactions

Lorsque vous démarrez une transaction, il est possible de spécifier à la fois le readConcern et writeConcern comme indiqué dans l'exemple ci-dessous :

mySession.startTransaction({readConcern: {level: 'snapshot'}, writeConcern: {w: 'majority'}});

En effetreadConcern, Amazon DocumentDB prend en charge l'isolation des instantanés par défaut. Si un readConcern niveau local, disponible ou majoritaire est spécifié, Amazon DocumentDB passe au readConcern niveau snapshot. Amazon DocumentDB ne prend pas en charge le linéarisable readConcern et la spécification d'un tel problème de lecture entraînera une erreur.

En writeConcern effet, Amazon DocumentDB prend en charge la majorité par défaut et un quorum d'écriture est atteint lorsque quatre copies des données sont conservées sur trois. AZs Si une valeur inférieure writeConcern est spécifiée, Amazon DocumentDB passera writeConcern à la majorité. De plus, toutes les écritures Amazon DocumentDB sont journalisées et la journalisation ne peut pas être désactivée.

Cas d’utilisation

Dans cette section, nous allons passer en revue deux cas d'utilisation pour les transactions : les relevés multiples et les collectes multiples.

Transactions comportant plusieurs relevés

Les transactions Amazon DocumentDB sont multi-instructions, ce qui signifie que vous pouvez écrire une transaction qui couvre plusieurs instructions avec un commit ou un rollback explicite. Vous pouvez regrouperinsert, updatedelete, et les findAndModify actions sous la forme d'une seule opération atomique.

Un cas d'utilisation courant pour les transactions à états multiples est une transaction de débit-crédit. Par exemple : vous devez de l'argent à un ami pour des vêtements. Ainsi, vous devez débiter (retirer) 500$ de votre compte et créditer 500$ (dépôt) sur le compte de votre ami. Pour effectuer cette opération, vous devez effectuer à la fois les opérations de débit et de crédit dans le cadre d'une seule transaction afin de garantir l'atomicité. Cela permet d'éviter les scénarios dans lesquels 500$ sont débités de votre compte, mais pas crédités sur le compte de votre ami. Voici à quoi ressemblerait ce cas d'utilisation :

// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; // add $500 to Bob's account var bobBalance = accountColl.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountColl.find({"name": "Bob"}).next().balance; session.commitTransaction(); accountColl.find(); // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; session.abortTransaction();

Transactions de collecte multiple

Nos transactions sont également multi-collections, ce qui signifie qu'elles peuvent être utilisées pour effectuer plusieurs opérations dans le cadre d'une seule transaction et sur plusieurs collections. Cela fournit une vue cohérente des données et préserve l'intégrité de vos données. Lorsque vous validez les commandes en une seule fois<>, les transactions sont all-or-nothing des exécutions, en ce sens qu'elles aboutissent toutes ou échouent.

Voici un exemple de transactions à collectes multiples, utilisant le même scénario et les mêmes données que l'exemple pour les transactions à relevés multiples.

// *** Transfer $500 from Alice to Bob inside a transaction: Success Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var amountToTransfer = 500; var collectionName = "account"; var session = db.getMongo().startSession({causalConsistency: false}); var accountCollInBankA = session.getDatabase("bankA")[collectionName]; var accountCollInBankB = session.getDatabase("bankB")[collectionName]; accountCollInBankA.drop(); accountCollInBankB.drop(); accountCollInBankA.insert({name: "Alice", balance: 1000}); accountCollInBankB.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; // add $500 to Bob's account var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; session.commitTransaction(); accountCollInBankA.find(); // Alice holds $500 in bankA accountCollInBankB.find(); // Bob holds $1500 in bankB // *** Transfer $500 from Alice to Bob inside a transaction: Failure Scenario*** // Setup bank account for Alice and Bob. Each have $1000 in their account var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var accountCollInBankA = session.getDatabase("bankA")[collectionName]; var accountCollInBankB = session.getDatabase("bankB")[collectionName]; accountCollInBankA.drop(); accountCollInBankB.drop(); accountCollInBankA.insert({name: "Alice", balance: 1000}); accountCollInBankB.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; var newAliceBalance = aliceBalance - amountToTransfer; accountCollInBankA.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountCollInBankA.find({"name": "Alice"}).next().balance; // add $500 to Bob's account var bobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountCollInBankB.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountCollInBankB.find({"name": "Bob"}).next().balance; session.abortTransaction(); accountCollInBankA.find(); // Alice holds $1000 in bankA accountCollInBankB.find(); // Bob holds $1000 in bankB

Exemples d'API de transaction pour l'API de rappel

L'API de rappel n'est disponible que pour les pilotes 4.2+.

Javascript

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Javascript.

// *** Transfer $500 from Alice to Bob inside a transaction: Success *** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert.eq(newAliceBalance, findAliceBalance); // add $500 to Bob's account var bobBalance = accountColl.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountColl.find({"name": "Bob"}).next().balance; assert.eq(newBobBalance, findBobBalance); session.commitTransaction(); accountColl.find();
Node.js

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Node.js.

// Node.js callback API: const bankDB = await mongoclient.db("bank"); var accountColl = await bankDB.createCollection("account"); var amountToTransfer = 500; const session = mongoclient.startSession({causalConsistency: false}); await accountColl.drop(); await accountColl.insertOne({name: "Alice", balance: 1000}, { session }); await accountColl.insertOne({name: "Bob", balance: 1000}, { session }); const transactionOptions = { readConcern: { level: 'snapshot' }, writeConcern: { w: 'majority' } }; // deduct $500 from Alice's account var aliceBalance = await accountColl.findOne({name: "Alice"}, {session}); assert(aliceBalance.balance >= amountToTransfer); var newAliceBalance = aliceBalance - amountToTransfer; session.startTransaction(transactionOptions); await accountColl.updateOne({name: "Alice"}, {$set: {balance: newAliceBalance}}, {session }); await session.commitTransaction(); aliceBalance = await accountColl.findOne({name: "Alice"}, {session}); assert(newAliceBalance == aliceBalance.balance); // add $500 to Bob's account var bobBalance = await accountColl.findOne({name: "Bob"}, {session}); var newBobBalance = bobBalance.balance + amountToTransfer; session.startTransaction(transactionOptions); await accountColl.updateOne({name: "Bob"}, {$set: {balance: newBobBalance}}, {session }); await session.commitTransaction(); bobBalance = await accountColl.findOne({name: "Bob"}, {session}); assert(newBobBalance == bobBalance.balance);
C#

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec C#.

// C# Callback API var dbName = "bank"; var collName = "account"; var amountToTransfer = 500; using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false})) { var bankDB = client.GetDatabase(dbName); var accountColl = bankDB.GetCollection<BsonDocument>(collName); bankDB.DropCollection(collName); accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } }); accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } }); // start transaction var transactionOptions = new TransactionOptions( readConcern: ReadConcern.Snapshot, writeConcern: WriteConcern.WMajority); var result = session.WithTransaction( (sess, cancellationtoken) => { // deduct $500 from Alice's account var aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer; accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice"), Builders<BsonDocument>.Update.Set("balance", newAliceBalance)); aliceBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance == newAliceBalance); // add $500 from Bob's account var bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); var newBobBalance = bobBalance.AsInt32 + amountToTransfer; accountColl.UpdateOne(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob"), Builders<BsonDocument>.Update.Set("balance", newBobBalance)); bobBalance = accountColl.Find(sess, Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(bobBalance == newBobBalance); return "Transaction committed"; }, transactionOptions); // check values outside of transaction var aliceNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); var bobNewBalance = accountColl.Find(Builders<BsonDocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceNewBalance == 500); Debug.Assert(bobNewBalance == 1500); }
Ruby

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Ruby.

// Ruby Callback API dbName = "bank" collName = "account" amountToTransfer = 500 session = client.start_session(:causal_consistency=> false) bankDB = Mongo::Database.new(client, dbName) accountColl = bankDB[collName] accountColl.drop() accountColl.insert_one({"name"=>"Alice", "balance"=>1000}) accountColl.insert_one({"name"=>"Bob", "balance"=>1000}) # start transaction session.with_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) do # deduct $500 from Alice's account aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance'] assert aliceBalance >= amountToTransfer newAliceBalance = aliceBalance - amountToTransfer accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session) aliceBalance = accountColl.find({"name"=>>"Alice"}, :session=> session).first['balance'] assert_equal(newAliceBalance, aliceBalance) # add $500 from Bob's account bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] newBobBalance = bobBalance + amountToTransfer accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session) bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] assert_equal(newBobBalance, bobBalance) end # check results outside of transaction aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance'] bobBalance = accountColl.find({"name"=>"Bob"}).first['balance'] assert_equal(aliceBalance, 500) assert_equal(bobBalance, 1500) session.end_session
Go

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Go.

// Go - Callback API type Account struct { Name string Balance int } ctx := context.TODO() dbName := "bank" collName := "account" amountToTransfer := 500 session, err := client.StartSession(options.Session().SetCausalConsistency(false)) assert.NilError(t, err) defer session.EndSession(ctx) bankDB := client.Database(dbName) accountColl := bankDB.Collection(collName) accountColl.Drop(ctx) _, err = accountColl.InsertOne(ctx, bson.M{"name" : "Alice", "balance":1000}) _, err = accountColl.InsertOne(ctx, bson.M{"name" : "Bob", "balance":1000}) transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.New(writeconcern.WMajority())) _, err = session.WithTransaction(ctx, func(sessionCtx mongo.SessionContext) (interface{}, error) { var result Account // deduct $500 from Alice's account err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result) aliceBalance := result.Balance newAliceBalance := aliceBalance - amountToTransfer _, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}}) err = accountColl.FindOne(sessionCtx, bson.M{"name": "Alice"}).Decode(&result) aliceBalance = result.Balance assert.Equal(t, aliceBalance, newAliceBalance) // add $500 to Bob's account err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result) bobBalance := result.Balance newBobBalance := bobBalance + amountToTransfer _, err = accountColl.UpdateOne(sessionCtx, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}}) err = accountColl.FindOne(sessionCtx, bson.M{"name": "Bob"}).Decode(&result) bobBalance = result.Balance assert.Equal(t, bobBalance, newBobBalance) if err != nil { return nil, err } return "transaction committed", err }, transactionOptions) // check results outside of transaction var result Account err = accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result) aliceNewBalance := result.Balance err = accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result) bobNewBalance := result.Balance assert.Equal(t, aliceNewBalance, 500) assert.Equal(t, bobNewBalance, 1500)
Java

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Java.

// Java (sync) - Callback API MongoDatabase bankDB = mongoClient.getDatabase("bank"); MongoCollection accountColl = bankDB.getCollection("account"); accountColl.drop(); int amountToTransfer = 500; // add sample data accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)); accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)); TransactionOptions txnOptions = TransactionOptions.builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build(); ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build(); try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) { clientSession.withTransaction(new TransactionBody<Void>() { @Override public Void execute() { // deduct $500 from Alice's account List<Document> documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int aliceBalance = (int) documentList.get(0).get("balance"); int newAliceBalance = aliceBalance - amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))); // check Alice's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newAliceBalance); // add $500 to Bob's account documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); int bobBalance = (int) documentList.get(0).get("balance"); int newBobBalance = bobBalance + amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))); // check Bob's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newBobBalance); return null; } }, txnOptions); }
C

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec C.

// Sample Code for C with Callback #include <bson.h> #include <mongoc.h> #include <stdio.h> #include <string.h> #include <assert.h> typedef struct { int64_t balance; bson_t *account; bson_t *opts; mongoc_collection_t *collection; } ctx_t; bool callback_session (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error) { bool r = true; ctx_t *data = (ctx_t *) ctx; bson_t local_reply; bson_t *selector = data->account; bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (data->balance), "}"); mongoc_collection_update_one (data->collection, selector, update, data->opts, &local_reply, error); *reply = bson_copy (&local_reply); bson_destroy (&local_reply); bson_destroy (update); return r; } void test_callback_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){ bson_t reply; bool r = true; const bson_t *doc; bson_iter_t iter; ctx_t alice_ctx; ctx_t bob_ctx; bson_error_t error; // find query bson_t *alice_query = bson_new (); BSON_APPEND_UTF8(alice_query, "name", "Alice"); bson_t *bob_query = bson_new (); BSON_APPEND_UTF8(bob_query, "name", "Bob"); // create session // set causal consistency to false mongoc_session_opt_t *session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_causal_consistency (session_opts, false); // start the session mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error); // add session to options bson_t *opts = bson_new(); mongoc_client_session_append (client_session, opts, &error); // deduct 500 from Alice // find account balance of Alice mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance >= amount_to_transfer); int64_t new_alice_balance = alice_balance - amount_to_transfer; // set variables which will be used by callback function alice_ctx.collection = collection; alice_ctx.opts = opts; alice_ctx.balance = new_alice_balance; alice_ctx.account = alice_query; // callback r = mongoc_client_session_with_transaction (client_session, &callback_session, NULL, &alice_ctx, &reply, &error); assert(r); // find account balance of Alice after transaction cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance == new_alice_balance); assert(alice_balance == 500); // add 500 to bob's balance // find account balance of Bob cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64; int64_t new_bob_balance = bob_balance + amount_to_transfer; bob_ctx.collection = collection; bob_ctx.opts = opts; bob_ctx.balance = new_bob_balance; bob_ctx.account = bob_query; // set read & write concern mongoc_read_concern_t *read_concern = mongoc_read_concern_new (); mongoc_write_concern_t *write_concern = mongoc_write_concern_new (); mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new (); mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY); mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT); mongoc_transaction_opts_set_write_concern (txn_opts, write_concern); mongoc_transaction_opts_set_read_concern (txn_opts, read_concern); // callback r = mongoc_client_session_with_transaction (client_session, &callback_session, txn_opts, &bob_ctx, &reply, &error); assert(r); // find account balance of Bob after transaction cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); bob_balance = (bson_iter_value (&iter))->value.v_int64; assert(bob_balance == new_bob_balance); assert(bob_balance == 1500); // cleanup bson_destroy(alice_query); bson_destroy(bob_query); mongoc_client_session_destroy(client_session); bson_destroy(opts); mongoc_transaction_opts_destroy(txn_opts); mongoc_read_concern_destroy(read_concern); mongoc_write_concern_destroy(write_concern); mongoc_cursor_destroy(cursor); bson_destroy(doc); } int main(int argc, char* argv[]) { mongoc_init (); mongoc_client_t* client = mongoc_client_new (<connection uri>); bson_error_t error; // connect to bank db mongoc_database_t *database = mongoc_client_get_database (client, "bank"); // access account collection mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account"); // set amount to transfer int64_t amount_to_transfer = 500; // delete the collection if already existing mongoc_collection_drop(collection, &error); // open Alice account bson_t *alice_account = bson_new (); BSON_APPEND_UTF8(alice_account, "name", "Alice"); BSON_APPEND_INT64(alice_account, "balance", 1000); // open Bob account bson_t *bob_account = bson_new (); BSON_APPEND_UTF8(bob_account, "name", "Bob"); BSON_APPEND_INT64(bob_account, "balance", 1000); bool r = true; r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} test_callback_money_transfer(client, collection, amount_to_transfer); }
Python

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Python.

// Sample Python code with callback api import pymongo def callback(session, balance, query): collection.update_one(query, {'$set': {"balance": balance}}, session=session) client = pymongo.MongoClient(<connection uri>) rc_snapshot = pymongo.read_concern.ReadConcern('snapshot') wc_majority = pymongo.write_concern.WriteConcern('majority') # To start, drop and create an account collection and insert balances for both Alice and Bob collection = client.get_database("bank").get_collection("account") collection.drop() collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000}) collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000}) amount_to_transfer = 500 # deduct 500 from Alice's account alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert alice_balance >= amount_to_transfer new_alice_balance = alice_balance - amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.with_transaction(lambda s: callback(s, new_alice_balance, {"name": "Alice"}), read_concern=rc_snapshot, write_concern=wc_majority) updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert updated_alice_balance == new_alice_balance # add 500 to Bob's account bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert bob_balance >= amount_to_transfer new_bob_balance = bob_balance + amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.with_transaction(lambda s: callback(s, new_bob_balance, {"name": "Bob"}), read_concern=rc_snapshot, write_concern=wc_majority) updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert updated_bob_balance == new_bob_balance

Exemples d'API de transaction pour l'API principale

Javascript

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Javascript.

// *** Transfer $500 from Alice to Bob inside a transaction: Success *** // Setup bank account for Alice and Bob. Each have $1000 in their account var databaseName = "bank"; var collectionName = "account"; var amountToTransfer = 500; var session = db.getMongo().startSession({causalConsistency: false}); var bankDB = session.getDatabase(databaseName); var accountColl = bankDB[collectionName]; accountColl.drop(); accountColl.insert({name: "Alice", balance: 1000}); accountColl.insert({name: "Bob", balance: 1000}); session.startTransaction(); // deduct $500 from Alice's account var aliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance - amountToTransfer; accountColl.update({"name": "Alice"},{"$set": {"balance": newAliceBalance}}); var findAliceBalance = accountColl.find({"name": "Alice"}).next().balance; assert.eq(newAliceBalance, findAliceBalance); // add $500 to Bob's account var bobBalance = accountColl.find({"name": "Bob"}).next().balance; var newBobBalance = bobBalance + amountToTransfer; accountColl.update({"name": "Bob"},{"$set": {"balance": newBobBalance}}); var findBobBalance = accountColl.find({"name": "Bob"}).next().balance; assert.eq(newBobBalance, findBobBalance); session.commitTransaction(); accountColl.find();
C#

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec C#.

// C# Core API public void TransferMoneyWithRetry(IMongoCollection<bSondocument> accountColl, IClientSessionHandle session) { var amountToTransfer = 500; // start transaction var transactionOptions = new TransactionOptions( readConcern: ReadConcern.Snapshot, writeConcern: WriteConcern.WMajority); session.StartTransaction(transactionOptions); try { // deduct $500 from Alice's account var aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance >= amountToTransfer); var newAliceBalance = aliceBalance.AsInt32 - amountToTransfer; accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Alice"), Builders<bSondocument>.Update.Set("balance", newAliceBalance)); aliceBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceBalance == newAliceBalance); // add $500 from Bob's account var bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); var newBobBalance = bobBalance.AsInt32 + amountToTransfer; accountColl.UpdateOne(session, Builders<bSondocument>.Filter.Eq("name", "Bob"), Builders<bSondocument>.Update.Set("balance", newBobBalance)); bobBalance = accountColl.Find(session, Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(bobBalance == newBobBalance); } catch (Exception e) { session.AbortTransaction(); throw; } session.CommitTransaction(); } } public void DoTransactionWithRetry(MongoClient client) { var dbName = "bank"; var collName = "account"; using (var session = client.StartSession(new ClientSessionOptions{CausalConsistency = false})) { try { var bankDB = client.GetDatabase(dbName); var accountColl = bankDB.GetCollection<bSondocument>(collName); bankDB.DropCollection(collName); accountColl.InsertOne(session, new BsonDocument { {"name", "Alice"}, {"balance", 1000 } }); accountColl.InsertOne(session, new BsonDocument { {"name", "Bob"}, {"balance", 1000 } }); while(true) { try { TransferMoneyWithRetry(accountColl, session); break; } catch (MongoException e) { if(e.HasErrorLabel("TransientTransactionError")) { continue; } else { throw; } } } // check values outside of transaction var aliceNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Alice")).FirstOrDefault().GetValue("balance"); var bobNewBalance = accountColl.Find(Builders<bSondocument>.Filter.Eq("name", "Bob")).FirstOrDefault().GetValue("balance"); Debug.Assert(aliceNewBalance == 500); Debug.Assert(bobNewBalance == 1500); } catch (Exception e) { Console.WriteLine("Error running transaction: " + e.Message); } } }
Ruby

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Ruby.

# Ruby Core API def transfer_money_w_retry(session, accountColl) amountToTransfer = 500 session.start_transaction(read_concern: {level: :snapshot}, write_concern: {w: :majority}) # deduct $500 from Alice's account aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance'] assert aliceBalance >= amountToTransfer newAliceBalance = aliceBalance - amountToTransfer accountColl.update_one({"name"=>"Alice"}, { "$set" => {"balance"=>newAliceBalance} }, :session=> session) aliceBalance = accountColl.find({"name"=>"Alice"}, :session=> session).first['balance'] assert_equal(newAliceBalance, aliceBalance) # add $500 to Bob's account bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] newBobBalance = bobBalance + amountToTransfer accountColl.update_one({"name"=>"Bob"}, { "$set" => {"balance"=>newBobBalance} }, :session=> session) bobBalance = accountColl.find({"name"=>"Bob"}, :session=> session).first['balance'] assert_equal(newBobBalance, bobBalance) session.commit_transaction end def do_txn_w_retry(client) dbName = "bank" collName = "account" session = client.start_session(:causal_consistency=> false) bankDB = Mongo::Database.new(client, dbName) accountColl = bankDB[collName] accountColl.drop() accountColl.insert_one({"name"=>"Alice", "balance"=>1000}) accountColl.insert_one({"name"=>"Bob", "balance"=>1000}) begin transferMoneyWithRetry(session, accountColl) puts "transaction committed" rescue Mongo::Error => e if e.label?('TransientTransactionError') retry else puts "transaction failed" raise end end # check results outside of transaction aliceBalance = accountColl.find({"name"=>"Alice"}).first['balance'] bobBalance = accountColl.find({"name"=>"Bob"}).first['balance'] assert_equal(aliceBalance, 500) assert_equal(bobBalance, 1500) end
Go

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Go.

// Go - Core API type Account struct { Name string Balance int } func transferMoneyWithRetry(sessionContext mongo.SessionContext, accountColl *mongo.Collection, t *testing.T) error { amountToTransfer := 500 transactionOptions := options.Transaction().SetReadConcern(readconcern.Snapshot()). SetWriteConcern(writeconcern.New(writeconcern.WMajority())) if err := sessionContext.StartTransaction(transactionOptions); err != nil { panic(err) } var result Account // deduct $500 from Alice's account err := accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result) aliceBalance := result.Balance newAliceBalance := aliceBalance - amountToTransfer _, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Alice"}, bson.M{"$set": bson.M{"balance": newAliceBalance}}) if err != nil { sessionContext.AbortTransaction(sessionContext) } err = accountColl.FindOne(sessionContext, bson.M{"name": "Alice"}).Decode(&result) aliceBalance = result.Balance assert.Equal(t, aliceBalance, newAliceBalance) // add $500 to Bob's account err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result) bobBalance := result.Balance newBobBalance := bobBalance + amountToTransfer _, err = accountColl.UpdateOne(sessionContext, bson.M{"name": "Bob"}, bson.M{"$set": bson.M{"balance": newBobBalance}}) if err != nil { sessionContext.AbortTransaction(sessionContext) } err = accountColl.FindOne(sessionContext, bson.M{"name": "Bob"}).Decode(&result) bobBalance = result.Balance assert.Equal(t, bobBalance, newBobBalance) err = sessionContext.CommitTransaction(sessionContext) return err } func doTransactionWithRetry(t *testing.T) { ctx := context.TODO() dbName := "bank" collName := "account" bankDB := client.Database(dbName) accountColl := bankDB.Collection(collName) client.UseSessionWithOptions(ctx, options.Session().SetCausalConsistency(false), func(sessionContext mongo.SessionContext) error { accountColl.Drop(ctx) accountColl.InsertOne(sessionContext, bson.M{"name" : "Alice", "balance":1000}) accountColl.InsertOne(sessionContext, bson.M{"name" : "Bob", "balance":1000}) for { err := transferMoneyWithRetry(sessionContext, accountColl, t) if err == nil { println("transaction committed") return nil } if mongoErr := err.(mongo.CommandError); mongoErr.HasErrorLabel("TransientTransactionError") { continue } println("transaction failed") return err } }) // check results outside of transaction var result Account accountColl.FindOne(ctx, bson.M{"name": "Alice"}).Decode(&result) aliceBalance := result.Balance assert.Equal(t, aliceBalance, 500) accountColl.FindOne(ctx, bson.M{"name": "Bob"}).Decode(&result) bobBalance := result.Balance assert.Equal(t, bobBalance, 1500) }
Java

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Java.

// Java (sync) - Core API public void transferMoneyWithRetry() { // connect to server MongoClientURI mongoURI = new MongoClientURI(uri); MongoClient mongoClient = new MongoClient(mongoURI); MongoDatabase bankDB = mongoClient.getDatabase("bank"); MongoCollection accountColl = bankDB.getCollection("account"); accountColl.drop(); // insert some sample data accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)); accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)); while (true) { try { doTransferMoneyWithRetry(accountColl, mongoClient); break; } catch (MongoException e) { if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) { continue; } else { throw e; } } } } public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) { int amountToTransfer = 500; TransactionOptions txnOptions = TransactionOptions.builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build(); ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build(); try ( ClientSession clientSession = mongoClient.startSession(sessionOptions) ) { clientSession.startTransaction(txnOptions); // deduct $500 from Alice's account List<Document> documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int aliceBalance = (int) documentList.get(0).get("balance"); Assert.assertTrue(aliceBalance >= amountToTransfer); int newAliceBalance = aliceBalance - amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))); // check Alice's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Alice")).into(documentList); int updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newAliceBalance); // add $500 to Bob's account documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); int bobBalance = (int) documentList.get(0).get("balance"); int newBobBalance = bobBalance + amountToTransfer; accountColl.updateOne(clientSession, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))); // check Bob's new balance documentList = new ArrayList<>(); accountColl.find(clientSession, new Document("name", "Bob")).into(documentList); updatedBalance = (int) documentList.get(0).get("balance"); Assert.assertEquals(updatedBalance, newBobBalance); // commit transaction clientSession.commitTransaction(); } } // Java (async) -- Core API public void transferMoneyWithRetry() { // connect to the server MongoClient mongoClient = MongoClients.create(uri); MongoDatabase bankDB = mongoClient.getDatabase("bank"); MongoCollection accountColl = bankDB.getCollection("account"); SubscriberLatchWrapper<Void> dropCallback = new SubscriberLatchWrapper<>(); mongoClient.getDatabase("bank").drop().subscribe(dropCallback); dropCallback.await(); // insert some sample data SubscriberLatchWrapper<InsertOneResult> insertionCallback = new SubscriberLatchWrapper<>(); accountColl.insertOne(new Document("name", "Alice").append("balance", 1000)).subscribe(insertionCallback); insertionCallback.await(); insertionCallback = new SubscriberLatchWrapper<>(); accountColl.insertOne(new Document("name", "Bob").append("balance", 1000)).subscribe(insertionCallback);; insertionCallback.await(); while (true) { try { doTransferMoneyWithRetry(accountColl, mongoClient); break; } catch (MongoException e) { if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) { continue; } else { throw e; } } } } public void doTransferMoneyWithRetry(MongoCollection accountColl, MongoClient mongoClient) { int amountToTransfer = 500; // start the transaction TransactionOptions txnOptions = TransactionOptions.builder() .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build(); ClientSessionOptions sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build(); SubscriberLatchWrapper<ClientSession> sessionCallback = new SubscriberLatchWrapper<>(); mongoClient.startSession(sessionOptions).subscribe(sessionCallback); ClientSession session = sessionCallback.get().get(0); session.startTransaction(txnOptions); // deduct $500 from Alice's account SubscriberLatchWrapper<Document> findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback); Document documentFound = findCallback.get().get(0); int aliceBalance = (int) documentFound.get("balance"); int newAliceBalance = aliceBalance - amountToTransfer; SubscriberLatchWrapper<UpdateResult> updateCallback = new SubscriberLatchWrapper<>(); accountColl.updateOne(session, new Document("name", "Alice"), new Document("$set", new Document("balance", newAliceBalance))).subscribe(updateCallback); updateCallback.await(); // check Alice's new balance findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Alice")).first().subscribe(findCallback); documentFound = findCallback.get().get(0); int updatedBalance = (int) documentFound.get("balance"); Assert.assertEquals(updatedBalance, newAliceBalance); // add $500 to Bob's account findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback); documentFound = findCallback.get().get(0); int bobBalance = (int) documentFound.get("balance"); int newBobBalance = bobBalance + amountToTransfer; updateCallback = new SubscriberLatchWrapper<>(); accountColl.updateOne(session, new Document("name", "Bob"), new Document("$set", new Document("balance", newBobBalance))).subscribe(updateCallback); updateCallback.await(); // check Bob's new balance findCallback = new SubscriberLatchWrapper<>(); accountColl.find(session, new Document("name", "Bob")).first().subscribe(findCallback); documentFound = findCallback.get().get(0); updatedBalance = (int) documentFound.get("balance"); Assert.assertEquals(updatedBalance, newBobBalance); // commit the transaction SubscriberLatchWrapper<Void> transactionCallback = new SubscriberLatchWrapper<>(); session.commitTransaction().subscribe(transactionCallback); transactionCallback.await(); } public class SubscriberLatchWrapper<T> implements Subscriber<T> { /** * A Subscriber that stores the publishers results and provides a latch so can block on completion. * * @param <T> The publishers result type */ private final List<T> received; private final List<RuntimeException> errors; private final CountDownLatch latch; private volatile Subscription subscription; private volatile boolean completed; /** * Construct an instance */ public SubscriberLatchWrapper() { this.received = new ArrayList<>(); this.errors = new ArrayList<>(); this.latch = new CountDownLatch(1); } @Override public void onSubscribe(final Subscription s) { subscription = s; subscription.request(Integer.MAX_VALUE); } @Override public void onNext(final T t) { received.add(t); } @Override public void onError(final Throwable t) { if (t instanceof RuntimeException) { errors.add((RuntimeException) t); } else { errors.add(new RuntimeException("Unexpected exception", t)); } onComplete(); } @Override public void onComplete() { completed = true; subscription.cancel(); latch.countDown(); } /** * Get received elements * * @return the list of received elements */ public List<T> getReceived() { return received; } /** * Get received elements. * * @return the list of receive elements */ public List<T> get() { return await().getReceived(); } /** * Await completion or error * * @return this */ public SubscriberLatchWrapper<T> await() { subscription.request(Integer.MAX_VALUE); try { if (!latch.await(300, TimeUnit.SECONDS)) { throw new MongoTimeoutException("Publisher onComplete timed out for 300 seconds"); } } catch (InterruptedException e) { throw new MongoInterruptedException("Interrupted waiting for observeration", e); } if (!errors.isEmpty()) { throw errors.get(0); } return this; } public boolean getCompleted() { return this.completed; } public void close() { subscription.cancel(); received.clear(); } }
C

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec C.

// Sample C code with core session bool core_session(mongoc_client_session_t *client_session, mongoc_collection_t* collection, bson_t *selector, int64_t balance){ bool r = true; bson_error_t error; bson_t *opts = bson_new(); bson_t *update = BCON_NEW ("$set", "{", "balance", BCON_INT64 (balance), "}"); // set read & write concern mongoc_read_concern_t *read_concern = mongoc_read_concern_new (); mongoc_write_concern_t *write_concern = mongoc_write_concern_new (); mongoc_transaction_opt_t *txn_opts = mongoc_transaction_opts_new (); mongoc_write_concern_set_w(write_concern, MONGOC_WRITE_CONCERN_W_MAJORITY); mongoc_read_concern_set_level(read_concern, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT); mongoc_transaction_opts_set_write_concern (txn_opts, write_concern); mongoc_transaction_opts_set_read_concern (txn_opts, read_concern); mongoc_client_session_start_transaction (client_session, txn_opts, &error); mongoc_client_session_append (client_session, opts, &error); r = mongoc_collection_update_one (collection, selector, update, opts, NULL, &error); mongoc_client_session_commit_transaction (client_session, NULL, &error); bson_destroy (opts); mongoc_transaction_opts_destroy(txn_opts); mongoc_read_concern_destroy(read_concern); mongoc_write_concern_destroy(write_concern); bson_destroy (update); return r; } void test_core_money_transfer(mongoc_client_t* client, mongoc_collection_t* collection, int amount_to_transfer){ bson_t reply; bool r = true; const bson_t *doc; bson_iter_t iter; bson_error_t error; // find query bson_t *alice_query = bson_new (); BSON_APPEND_UTF8(alice_query, "name", "Alice"); bson_t *bob_query = bson_new (); BSON_APPEND_UTF8(bob_query, "name", "Bob"); // create session // set causal consistency to false mongoc_session_opt_t *session_opts = mongoc_session_opts_new (); mongoc_session_opts_set_causal_consistency (session_opts, false); // start the session mongoc_client_session_t *client_session = mongoc_client_start_session (client, session_opts, &error); // add session to options bson_t *opts = bson_new(); mongoc_client_session_append (client_session, opts, &error); // deduct 500 from Alice // find account balance of Alice mongoc_cursor_t *cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance >= amount_to_transfer); int64_t new_alice_balance = alice_balance - amount_to_transfer; // core r = core_session (client_session, collection, alice_query, new_alice_balance); assert(r); // find account balance of Alice after transaction cursor = mongoc_collection_find_with_opts (collection, alice_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); alice_balance = (bson_iter_value (&iter))->value.v_int64; assert(alice_balance == new_alice_balance); assert(alice_balance == 500); // add 500 to Bob's balance // find account balance of Bob cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); int64_t bob_balance = (bson_iter_value (&iter))->value.v_int64; int64_t new_bob_balance = bob_balance + amount_to_transfer; //core r = core_session (client_session, collection, bob_query, new_bob_balance); assert(r); // find account balance of Bob after transaction cursor = mongoc_collection_find_with_opts (collection, bob_query, NULL, NULL); mongoc_cursor_next (cursor, &doc); bson_iter_init (&iter, doc); bson_iter_find (&iter, "balance"); bob_balance = (bson_iter_value (&iter))->value.v_int64; assert(bob_balance == new_bob_balance); assert(bob_balance == 1500); // cleanup bson_destroy(alice_query); bson_destroy(bob_query); mongoc_client_session_destroy(client_session); bson_destroy(opts); mongoc_cursor_destroy(cursor); bson_destroy(doc); } int main(int argc, char* argv[]) { mongoc_init (); mongoc_client_t* client = mongoc_client_new (<connection uri>); bson_error_t error; // connect to bank db mongoc_database_t *database = mongoc_client_get_database (client, "bank"); // access account collection mongoc_collection_t* collection = mongoc_client_get_collection(client, "bank", "account"); // set amount to transfer int64_t amount_to_transfer = 500; // delete the collection if already existing mongoc_collection_drop(collection, &error); // open Alice account bson_t *alice_account = bson_new (); BSON_APPEND_UTF8(alice_account, "name", "Alice"); BSON_APPEND_INT64(alice_account, "balance", 1000); // open Bob account bson_t *bob_account = bson_new (); BSON_APPEND_UTF8(bob_account, "name", "Bob"); BSON_APPEND_INT64(bob_account, "balance", 1000); bool r = true; r = mongoc_collection_insert_one(collection, alice_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} r = mongoc_collection_insert_one(collection, bob_account, NULL, NULL, &error); if (!r) {printf("Error encountered:%s", error.message);} test_core_money_transfer(client, collection, amount_to_transfer); }
Scala

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Scala.

// Scala Core API def transferMoneyWithRetry(sessionObservable: SingleObservable[ClientSession] , database: MongoDatabase ): Unit = { val accountColl = database.getCollection("account") var amountToTransfer = 500 var transactionObservable: Observable[ClientSession] = sessionObservable.map(clientSession => { clientSession.startTransaction() // deduct $500 from Alice's account var aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance") assert(aliceBalance >= amountToTransfer) var newAliceBalance = aliceBalance - amountToTransfer accountColl.updateOne(clientSession, Document("name" -> "Alice"), Document("$set" -> Document("balance" -> newAliceBalance))).await() aliceBalance = accountColl.find(clientSession, Document("name" -> "Alice")).await().head.getInteger("balance") assert(aliceBalance == newAliceBalance) // add $500 to Bob's account var bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance") var newBobBalance = bobBalance + amountToTransfer accountColl.updateOne(clientSession, Document("name" -> "Bob"), Document("$set" -> Document("balance" -> newBobBalance))).await() bobBalance = accountColl.find(clientSession, Document("name" -> "Bob")).await().head.getInteger("balance") assert(bobBalance == newBobBalance) clientSession }) transactionObservable.flatMap(clientSession => clientSession.commitTransaction()).await() } def doTransactionWithRetry(): Unit = { val client: MongoClient = MongoClientWrapper.getMongoClient() val database: MongoDatabase = client.getDatabase("bank") val accountColl = database.getCollection("account") accountColl.drop().await() val sessionOptions = ClientSessionOptions.builder().causallyConsistent(false).build() var sessionObservable: SingleObservable[ClientSession] = client.startSession(sessionOptions) accountColl.insertOne(Document("name" -> "Alice", "balance" -> 1000)).await() accountColl.insertOne(Document("name" -> "Bob", "balance" -> 1000)).await() var retry = true while (retry) { try { transferMoneyWithRetry(sessionObservable, database) println("transaction committed") retry = false } catch { case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => { println("retrying transaction") } case other: Throwable => { println("transaction failed") retry = false throw other } } } // check results outside of transaction assert(accountColl.find(Document("name" -> "Alice")).results().head.getInteger("balance") == 500) assert(accountColl.find(Document("name" -> "Bob")).results().head.getInteger("balance") == 1500) accountColl.drop().await() }
Python

Le code suivant montre comment utiliser l'API de transaction Amazon DocumentDB avec Python.

// Sample Python code with Core api import pymongo client = pymongo.MongoClient(<connection_string>) rc_snapshot = pymongo.read_concern.ReadConcern('snapshot') wc_majority = pymongo.write_concern.WriteConcern('majority') # To start, drop and create an account collection and insert balances for both Alice and Bob collection = client.get_database("bank").get_collection("account") collection.drop() collection.insert_one({"_id": 1, "name": "Alice", "balance": 1000}) collection.insert_one({"_id": 2, "name": "Bob", "balance": 1000}) amount_to_transfer = 500 # deduct 500 from Alice's account alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert alice_balance >= amount_to_transfer new_alice_balance = alice_balance - amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority) collection.update_one({"name": "Alice"}, {'$set': {"balance": new_alice_balance}}, session=session) session.commit_transaction() updated_alice_balance = collection.find_one({"name": "Alice"}).get("balance") assert updated_alice_balance == new_alice_balance # add 500 to Bob's account bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert bob_balance >= amount_to_transfer new_bob_balance = bob_balance + amount_to_transfer with client.start_session({'causalConsistency':False}) as session: session.start_transaction(read_concern=rc_snapshot, write_concern=wc_majority) collection.update_one({"name": "Bob"}, {'$set': {"balance": new_bob_balance}}, session=session) session.commit_transaction() updated_bob_balance = collection.find_one({"name": "Bob"}).get("balance") assert updated_bob_balance == new_bob_balance

Commandes prises en charge

Command Pris en charge

abortTransaction

Oui

commitTransaction

Oui

endSessions

Oui

killSession

Oui

killAllSession

Oui

killAllSessionsByPattern

Non

refreshSessions

Non

startSession

Oui

Fonctionnalités non prises en charge

Méthodes Étapes ou commandes

db.collection.aggregate()

$collStats

$currentOp

$indexStats

$listSessions

$out

db.collection.count()

db.collection.countDocuments()

$where

$near

$nearSphere

db.collection.insert()

insertn'est pas pris en charge s'il n'est pas exécuté sur une collection existante. Cette méthode est prise en charge si elle cible une collection préexistante.

Séances

Les sessions MongoDB sont un framework utilisé pour prendre en charge les écritures réessayables, la cohérence causale, les transactions et pour gérer les opérations entre les bases de données. Lorsqu'une session est créée, un identifiant de session logique (lsid) est généré par le client et est utilisé pour étiqueter toutes les opérations de cette session lors de l'envoi de commandes au serveur.

Amazon DocumentDB prend en charge l'utilisation de sessions pour activer les transactions, mais ne prend pas en charge la cohérence causale ni les écritures réessayables.

Lorsque vous utilisez des transactions au sein d'Amazon DocumentDB, une transaction est initiée depuis une session à l'aide de l'session.startTransaction()API et une session prend en charge une seule transaction à la fois. De même, les transactions sont effectuées à l'aide du commit (session.commitTransaction()) ou de l'abort (session.abortTransaction()). APIs

Cohérence causale

La cohérence causale garantit qu'au cours d'une seule session client, le client observera la read-after-write cohérence, que les lectures/écritures monatomiques et les écritures suivront les lectures et ces garanties s'appliquent à toutes les instances d'un cluster, et pas seulement à l'instance principale. Amazon DocumentDB ne prend pas en charge la cohérence causale et la déclaration suivante provoquera une erreur.

var mySession = db.getMongo().startSession(); var mySessionObject = mySession.getDatabase('test').getCollection('account'); mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}}); //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 } mySessionObject.find() //Error: error: { // "ok" : 0, // "code" : 303, // "errmsg" : "Feature not supported: 'causal consistency'", // "operationTime" : Timestamp(1603461817, 493214) //} mySession.endSession()

Vous pouvez désactiver la cohérence causale au sein d'une session. Veuillez noter que cela vous permettra d'utiliser le cadre de session, mais ne fournira aucune garantie de cohérence causale pour les lectures. Lorsque vous utilisez Amazon DocumentDB, les lectures depuis le serveur principal seront read-after-write cohérentes et les lectures depuis les instances de réplication seront finalement cohérentes. Les transactions constituent le principal cas d'utilisation des sessions.

var mySession = db.getMongo().startSession({causalConsistency: false}); var mySessionObject = mySession.getDatabase('test').getCollection('account'); mySessionObject.updateOne({"_id": 2}, {"$inc": {"balance": 400}}); //Result:{ "acknowledged" : true, "matchedCount" : 1, "modifiedCount" : 1 } mySessionObject.find() //{ "_id" : 1, "name" : "Bob", "balance" : 100 } //{ "_id" : 2, "name" : "Alice", "balance" : 1700 }

écritures réessayables

Les écritures réessayables sont une fonctionnalité dans laquelle le client tente de relancer les opérations d'écriture, une fois, lorsque des erreurs réseau se produisent ou s'il ne parvient pas à trouver l'écriture principale. Dans Amazon DocumentDB, les écritures réessayables ne sont pas prises en charge et doivent être désactivées. Vous pouvez le désactiver à l'aide de la commande (retryWrites=false) dans la chaîne de connexion.

Note

Si vous utilisez un ancien shell mongo (et non mongosh), n'incluez la retryWrites=false commande dans aucune chaîne de code. Par défaut, les écritures réessayables sont désactivées. L'inclusion retryWrites=false peut entraîner un échec des commandes de lecture normales.

Erreurs de transaction

Lorsque vous utilisez des transactions, certains scénarios peuvent générer une erreur indiquant qu'un numéro de transaction ne correspond à aucune transaction en cours.

L'erreur peut être générée dans au moins deux scénarios différents :

  • Après le délai d'expiration de la transaction d'une minute.

  • Après le redémarrage d'une instance (en raison d'un correctif, d'une restauration après un crash, etc.), il est possible de recevoir cette erreur même dans les cas où la transaction a été validée avec succès. Lors du redémarrage d'une instance, la base de données ne peut pas faire la différence entre une transaction terminée avec succès et une transaction abandonnée. En d'autres termes, l'état d'achèvement de la transaction est ambigu.

La meilleure façon de gérer cette erreur est de rendre les mises à jour transactionnelles idempotentes, par exemple en utilisant le $set mutateur au lieu d'une opération d'incrémentation/décrémentation. Voir ci-dessous :

{ "ok" : 0, "operationTime" : Timestamp(1603938167, 1), "code" : 251, "errmsg" : "Given transaction number 1 does not match any in-progress transactions." }