Navigation
This version of the documentation is archived and no longer supported.

Transactions

New in version 4.0.

In MongoDB, an operation on a single document is atomic. Because you can use embedded documents and arrays to capture relationships between data in a single document structure instead of normalizing across multiple documents and collections, this single-document atomicity obviates the need for multi-document transactions for many practical use cases.

However, for situations that require atomicity for updates to multiple documents or consistency between reads to multiple documents:

  • Starting in version 4.0, MongoDB provides the ability to perform multi-document transactions against replica sets.

Multi-document transactions can be used across multiple operations, collections, databases, and documents. Multi-document transactions provide an “all-or-nothing” proposition. When a transaction commits, all data changes made in the transaction are saved. If any operation in the transaction fails, the transaction aborts and all data changes made in the transaction are discarded without ever becoming visible. Until a transaction commits, no write operations in the transaction are visible outside the transaction.

Important

In most cases, multi-document transaction incurs a greater performance cost over single document writes, and the availability of multi-document transaction should not be a replacement for effective schema design. For many scenarios, the denormalized data model (embedded documents and arrays) will continue to be optimal for your data and use cases. That is, for many scenarios, modeling your data appropriately will minimize the need for multi-document transactions. For additional transactions usage considerations (such as runtime limit and oplog size limit), see also Production Considerations.

Transactions API

The following mongo shell example highlights the key components of using transactions:

Note

The example omits retry logic and robust error handling for simplicity’s sake. For a more practical example of incorporating transactions in applications, see Transactions in Applications instead.

// Start a session.
session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );

employeesCollection = session.getDatabase("hr").employees;
eventsCollection = session.getDatabase("reporting").events;

// Start a transaction
session.startTransaction( { readConcern: { level: "snapshot" }, writeConcern: { w: "majority" } } );

// Operations inside the transaction
try {
   employeesCollection.updateOne( { employee: 3 }, { $set: { status: "Inactive" } } );
   eventsCollection.insertOne( { employee: 3, status: { new: "Inactive", old: "Active" } } );
} catch (error) {
   // Abort transaction on error
   session.abortTransaction();
   throw error;
}

// Commit the transaction using write concern set at transaction start
session.commitTransaction();

session.endSession();

Transactions and Sessions

Transactions are associated with a session. That is, you start a transaction for a session. At any given time, you can have at most one open transaction for a session.

Important

When using the drivers, you must pass the session to each operation in the transaction.

If a session ends and it has an open transaction, the transaction aborts.

Transactions and MongoDB Drivers

Clients require MongoDB drivers updated for MongoDB 4.0.

Java 3.8.0

Python 3.7.0

C 1.11.0

C# 2.7

Node 3.1.0

Ruby 2.6.0

Perl 2.0.0

PHP (PHPC) 1.5.0

Scala 2.4.0

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction. For examples, see Transactions in Applications.

Transactions and the mongo Shell

The following mongo shell methods are available for transactions:

Transactions and Atomicity

Multi-document transactions are atomic:

  • When a transaction commits, all data changes made in the transaction are saved and visible outside the transaction. Until a transaction commits, the data changes made in the transaction are not visible outside the transaction.
  • When a transaction aborts, all data changes made in the transaction are discarded without ever becoming visible. For example, if any operation in the transaction fails, the transaction aborts and all data changes made in the transaction are discarded without ever becoming visible.

Transactions and Operations

For transactions:

  • You can specify read/write (CRUD) operations on existing collections. The collections can be in different databases. For a list of CRUD operations, see CRUD Operations.
  • You cannot read/write to collections in the config, admin, or local databases.
  • You cannot write to system.* collections.
  • You cannot return the supported operation’s query plan (i.e. explain).
  • For cursors created outside of transactions, you cannot call getMore inside a transaction.
  • For cursors created in a transaction, you cannot call getMore outside the transaction.

Operations that affect the database catalog, such as creating or dropping a collection or an index, are not allowed in multi-document transactions. For example, a multi-document transaction cannot include an insert operation that would result in the creation of a new collection. See Restricted Operations.

Tip

When creating or dropping a collection immediately before starting a transaction, if the collection is accessed within the transaction, issue the create or drop operation with write concern "majority" to ensure that the transaction can acquire the required locks.

Count Operation

To perform a count operation within a transaction, use the $count aggregation stage or the $group (with a $sum expression) aggregation stage.

MongoDB drivers compatible with the 4.0 features provide a collection-level API countDocuments(filter, options) as a helper method that uses the $group with a $sum expression to perform a count. The 4.0 drivers have deprecated the count() API.

Starting in MongoDB 4.0.3, the mongo shell provides the db.collection.countDocuments() helper method that uses the $group with a $sum expression to perform a count.

Informational Operations

Informational commands, such as hello, buildInfo, connectionStatus (and their helper methods) are allowed in transactions; however, they cannot be the first operation in the transaction.

Restricted Operations

The following operations are not allowed in multi-document transactions:

  • Operations that affect the database catalog, such as creating or dropping a collection or an index. For example, a multi-document transaction cannot include an insert operation that would result in the creation of a new collection.

    The listCollections and listIndexes commands and their helper methods are also excluded.

  • Non-CRUD and non-informational operations, such as createUser, getParameter, count, etc. and their helpers.

Transactions in Applications

Highly Available Applications

Regardless of the database system, whether MongoDB or relational databases, applications should take measures to handle errors during transaction commits and incorporate retry logic for transactions.

Retry Transaction

The individual write operations inside the transaction are not retryable, regardless of whether retryWrites is set to true.

If an operation encounters an error, the returned error may have an errorLabels array field. If the error is a transient error, the errorLabels array field contains "TransientTransactionError" as an element and the transaction as a whole can be retried.

For example, the following helper runs a function and retries the function if a "TransientTransactionError" is encountered:

// Runs the txnFunc and retries if TransientTransactionError encountered

function runTransactionWithRetry(txnFunc, session) {
    while (true) {
        try {
            txnFunc(session);  // performs transaction
            break;
        } catch (error) {
            print("Transaction aborted. Caught exception during transaction.");

            // If transient error, retry the whole transaction
            if ( error.hasOwnProperty("errorLabels") && error.errorLabels.includes( "TransientTransactionError")  ) {
                print("TransientTransactionError, retrying transaction ...");
                continue;
            } else {
                throw error;
            }
        }
    }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

def run_transaction_with_retry(txn_func, session):
    while True:
        try:
            txn_func(session)  # performs transaction
            break
        except (ConnectionFailure, OperationFailure) as exc:
            print("Transaction aborted. Caught exception during "
                  "transaction.")

            # If transient error, retry the whole transaction
            if exc.has_error_label("TransientTransactionError"):
                print("TransientTransactionError, retrying"
                      "transaction ...")
                continue
            else:
                raise

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

void runTransactionWithRetry(Runnable transactional) {
    while (true) {
        try {
            transactional.run();
            break;
        } catch (MongoException e) {
            System.out.println("Transaction aborted. Caught exception during transaction.");

            if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
                System.out.println("TransientTransactionError, aborting transaction and retrying ...");
                continue;
            } else {
                throw e;
            }
        }
    }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

async function runTransactionWithRetry(txnFunc, client, session) {
  try {
    await txnFunc(client, session);
  } catch (error) {
    console.log('Transaction aborted. Caught exception during transaction.');

    // If transient error, retry the whole transaction
    if (error.hasErrorLabel('TransientTransactionError')) {
      console.log('TransientTransactionError, retrying transaction ...');
      await runTransactionWithRetry(txnFunc, client, session);
    } else {
      throw error;
    }
  }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

private function runTransactionWithRetry1(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session)
{
    while (true) {
        try {
            $txnFunc($client, $session);  // performs transaction
            break;
        } catch (\MongoDB\Driver\Exception\CommandException $error) {
            $resultDoc = $error->getResultDocument();
            echo "Transaction aborted. Caught exception during transaction.\n";

            // If transient error, retry the whole transaction
            if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) {
                echo "TransientTransactionError, retrying transaction ...\n";
                continue;
            } else {
                throw $error;
            }
        } catch (\MongoDB\Driver\Exception\Exception $error) {
            throw $error;
        }
    }
}
bool
run_transaction_with_retry (txn_func_t txn_func,
                            mongoc_client_session_t *cs,
                            bson_error_t *error)
{
   bson_t reply;
   bool r;

   while (true) {
      /* perform transaction */
      r = txn_func (cs, &reply, error);
      if (r) {
         /* success */
         bson_destroy (&reply);
         return true;
      }

      MONGOC_WARNING ("Transaction aborted: %s", error->message);
      if (mongoc_error_has_label (&reply, "TransientTransactionError")) {
         /* on transient error, retry the whole transaction */
         MONGOC_WARNING ("TransientTransactionError, retrying transaction...");
         bson_destroy (&reply);
      } else {
         /* non-transient error */
         break;
      }
   }

   bson_destroy (&reply);
   return false;
}


using transaction_func = std::function<void(client_session & session)>;
auto run_transaction_with_retry = [](transaction_func txn_func, client_session& session) {
    while (true) {
        try {
            txn_func(session);  // performs transaction.
            break;
        } catch (const operation_exception& oe) {
            std::cout << "Transaction aborted. Caught exception during transaction."
                      << std::endl;
            // If transient error, retry the whole transaction.
            if (oe.has_error_label("TransientTransactionError")) {
                std::cout << "TransientTransactionError, retrying transaction ..."
                          << std::endl;
                continue;
            } else {
                throw oe;
            }
        }
    }
};
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session)
{
    while (true)
    {
        try
        {
            txnFunc(client, session); // performs transaction
            break;
        }
        catch (MongoException exception)
        {
            Console.WriteLine($"Transaction aborted. Caught exception during transaction: ${exception.Message}.");

            // if transient error, retry the whole transaction
            if (exception.HasErrorLabel("TransientTransactionError"))
            {
                Console.WriteLine("TransientTransactionError, retrying transaction.");
                continue;
            }
            else
            {
                throw;
            }  
        }
    }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

sub runTransactionWithRetry {
    my ( $txnFunc, $session ) = @_;

    LOOP: {
        eval {
            $txnFunc->($session); # performs transaction
        };
        if ( my $error = $@ ) {
            print("Transaction aborted-> Caught exception during transaction.\n");
            # If transient error, retry the whole transaction
            if ( $error->has_error_label("TransientTransactionError") ) {
                print("TransientTransactionError, retrying transaction ->..\n");
                redo LOOP;
            }
            else {
                die $error;
            }
        }
    }

    return;
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

def run_transaction_with_retry(session)
  begin
    yield session # performs transaction
  rescue Mongo::Error => e

    puts 'Transaction aborted. Caught exception during transaction.'
    raise unless e.label?(Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL)

    puts "#{Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL}, retrying transaction ..."
    retry
  end
end

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

/*
 * Copyright 2008-present MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.mongodb.scala.documentation

import org.mongodb.scala.TestMongoClientHelper.hasSingleHost
import org.mongodb.scala._
import org.mongodb.scala.model.{ Filters, Updates }
import org.mongodb.scala.result.{ InsertOneResult, UpdateResult }

import scala.concurrent.Await

//scalastyle:off magic.number regex
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {

  // Implicit functions that execute the Observable and return the results
  implicit class ObservableExecutor[T](observable: Observable[T]) {
    def execute(): Seq[T] = Await.result(observable.toFuture(), WAIT_DURATION)
  }

  implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
    def execute(): T = Await.result(observable.toFuture(), WAIT_DURATION)
  }
  // end implicit functions

  "The Scala driver" should "be able to commit a transaction" in withClient { client =>
    assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost)
    client.getDatabase("hr").drop().execute()
    client.getDatabase("hr").createCollection("employees").execute()
    client.getDatabase("hr").createCollection("events").execute()

    updateEmployeeInfoWithRetry(client).execute()
    client.getDatabase("hr").drop().execute()
  }

  def updateEmployeeInfo(
      database: MongoDatabase,
      observable: SingleObservable[ClientSession]
  ): SingleObservable[ClientSession] = {
    observable.map(clientSession => {
      val employeesCollection = database.getCollection("employees")
      val eventsCollection = database.getCollection("events")

      val transactionOptions = TransactionOptions
        .builder()
        .readPreference(ReadPreference.primary())
        .readConcern(ReadConcern.SNAPSHOT)
        .writeConcern(WriteConcern.MAJORITY)
        .build()
      clientSession.startTransaction(transactionOptions)
      employeesCollection
        .updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
        .subscribe((res: UpdateResult) => println(res))
      eventsCollection
        .insertOne(
          clientSession,
          Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active"))
        )
        .subscribe((res: InsertOneResult) => ())

      clientSession
    })
  }

  def commitAndRetry(observable: SingleObservable[Void]): SingleObservable[Void] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
        println("UnknownTransactionCommitResult, retrying commit operation ...")
        commitAndRetry(observable)
      }
      case e: Exception => {
        println(s"Exception during commit ...: $e")
        throw e
      }
    })
  }

  def runTransactionAndRetry(observable: SingleObservable[Void]): SingleObservable[Void] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
        println("TransientTransactionError, aborting transaction and retrying ...")
        runTransactionAndRetry(observable)
      }
    })
  }

  def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Void] = {

    val database = client.getDatabase("hr")
    val updateEmployeeInfoObservable: SingleObservable[ClientSession] =
      updateEmployeeInfo(database, client.startSession())
    val commitTransactionObservable: SingleObservable[Void] =
      updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
    val commitAndRetryObservable: SingleObservable[Void] = commitAndRetry(commitTransactionObservable)

    runTransactionAndRetry(commitAndRetryObservable)
  }
}
// RunTransactionWithRetry is an example function demonstrating transaction retry logic.
func RunTransactionWithRetry(sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error {
	for {
		err := txnFn(sctx) // Performs transaction.
		if err == nil {
			return nil
		}

		log.Println("Transaction aborted. Caught exception during transaction.")

		// If transient error, retry the whole transaction
		if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
			log.Println("TransientTransactionError, retrying transaction...")
			continue
		}
		return err
	}
}

Retry Commit Operation

The commit operations are retryable write operations. If the commit operation encounters an error, MongoDB drivers retry the operation a single time regardless of whether retryWrites is set to true.

If the commit operation encounters an error, MongoDB returns an error with an errorLabels array field. If the error is a transient commit error, the errorLabels array field contains "UnknownTransactionCommitResult" as an element and the commit operation can be retried.

In addition to the single retry behavior provided by the MongoDB drivers, applications should take measures to handle "UnknownTransactionCommitResult" errors during transaction commits.

For example, the following helper commits a transaction and retries if a "UnknownTransactionCommitResult" is encountered:

// Retries commit if UnknownTransactionCommitResult encountered

function commitWithRetry(session) {
    while (true) {
        try {
            session.commitTransaction(); // Uses write concern set at transaction start.
            print("Transaction committed.");
            break;
        } catch (error) {
            // Can retry commit
            if (error.hasOwnProperty("errorLabels") && error.errorLabels.includes( "UnknownTransactionCommitResult") ) {
                print("UnknownTransactionCommitResult, retrying commit operation ...");
                continue;
            } else {
                print("Error during commit ...");
                throw error;
            }
       }
    }
}
def commit_with_retry(session):
    while True:
        try:
            # Commit uses write concern set at transaction start.
            session.commit_transaction()
            print("Transaction committed.")
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # Can retry commit
            if exc.has_error_label("UnknownTransactionCommitResult"):
                print("UnknownTransactionCommitResult, retrying "
                      "commit operation ...")
                continue
            else:
                print("Error during commit ...")
                raise
void commitWithRetry(ClientSession clientSession) {
    while (true) {
        try {
            clientSession.commitTransaction();
            System.out.println("Transaction committed");
            break;
        } catch (MongoException e) {
            // can retry commit
            if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
                System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");
                continue;
            } else {
                System.out.println("Exception during commit ...");
                throw e;
            }
        }
    }
}
async function commitWithRetry(session) {
  try {
    await session.commitTransaction();
    console.log('Transaction committed.');
  } catch (error) {
    if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
      console.log('UnknownTransactionCommitResult, retrying commit operation ...');
      await commitWithRetry(session);
    } else {
      console.log('Error during commit ...');
      throw error;
    }
  }
}
private function commitWithRetry2(\MongoDB\Driver\Session $session)
{
    while (true) {
        try {
            $session->commitTransaction();
            echo "Transaction committed.\n";
            break;
        } catch (\MongoDB\Driver\Exception\CommandException $error) {
            $resultDoc = $error->getResultDocument();

            if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) {
                echo "UnknownTransactionCommitResult, retrying commit operation ...\n";
                continue;
            } else {
                echo "Error during commit ...\n";
                throw $error;
            }
        } catch (\MongoDB\Driver\Exception\Exception $error) {
            echo "Error during commit ...\n";
            throw $error;
        }
    }
}
bool
commit_with_retry (mongoc_client_session_t *cs, bson_error_t *error)
{
   bson_t reply;
   bool r;

   while (true) {
      /* commit uses write concern set at transaction start, see
       * mongoc_transaction_opts_set_write_concern */
      r = mongoc_client_session_commit_transaction (cs, &reply, error);
      if (r) {
         MONGOC_INFO ("Transaction committed");
         break;
      }

      if (mongoc_error_has_label (&reply, "UnknownTransactionCommitResult")) {
         MONGOC_WARNING ("UnknownTransactionCommitResult, retrying commit ...");
         bson_destroy (&reply);
      } else {
         /* commit failed, cannot retry */
         break;
      }
   }

   bson_destroy (&reply);

   return r;
}


auto commit_with_retry = [](client_session& session) {
    while (true) {
        try {
            session.commit_transaction();  // Uses write concern set at transaction start.
            std::cout << "Transaction committed." << std::endl;
            break;
        } catch (const operation_exception& oe) {
            // Can retry commit
            if (oe.has_error_label("UnknownTransactionCommitResult")) {
                std::cout << "UnknownTransactionCommitResult, retrying commit operation ..."
                          << std::endl;
                continue;
            } else {
                std::cout << "Error during commit ..." << std::endl;
                throw oe;
            }
        }
    }
};
public void CommitWithRetry(IClientSessionHandle session)
{
    while (true)
    {
        try
        {
            session.CommitTransaction(); // uses write concern set at transaction start
            Console.WriteLine("Transaction committed.");
            break;
        }
        catch (MongoException exception)
        {
            // can retry commit
            if (exception.HasErrorLabel("UnknwonTransactionCommitResult"))
            {
                Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation.");
                continue;
            }
            else
            {
                Console.WriteLine($"Excpetion during commit: {exception.Message}.");
                throw;
            }
        }
    }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

sub commitWithRetry {
    my ($session) = @_;

    LOOP: {
        eval {
            $session->commit_transaction(); # Uses write concern set at transaction start.
            print("Transaction committed->\n");
        };
        if ( my $error = $@ ) {
            # Can retry commit
            if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
                print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
                redo LOOP;
            }
            else {
                print("Error during commit ->..\n");
                die $error;
            }
        }
    }

    return;
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

def commit_with_retry(session)
  begin
    session.commit_transaction
    puts 'Transaction committed.'
  rescue Mongo::Error=> e
    if e.label?(Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)
      puts "#{Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL}, retrying commit operation..."
      retry
    else
      puts 'Error during commit ...'
      raise
    end
  end
end

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

/*
 * Copyright 2008-present MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.mongodb.scala.documentation

import org.mongodb.scala.TestMongoClientHelper.hasSingleHost
import org.mongodb.scala._
import org.mongodb.scala.model.{ Filters, Updates }
import org.mongodb.scala.result.{ InsertOneResult, UpdateResult }

import scala.concurrent.Await

//scalastyle:off magic.number regex
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {

  // Implicit functions that execute the Observable and return the results
  implicit class ObservableExecutor[T](observable: Observable[T]) {
    def execute(): Seq[T] = Await.result(observable.toFuture(), WAIT_DURATION)
  }

  implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
    def execute(): T = Await.result(observable.toFuture(), WAIT_DURATION)
  }
  // end implicit functions

  "The Scala driver" should "be able to commit a transaction" in withClient { client =>
    assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost)
    client.getDatabase("hr").drop().execute()
    client.getDatabase("hr").createCollection("employees").execute()
    client.getDatabase("hr").createCollection("events").execute()

    updateEmployeeInfoWithRetry(client).execute()
    client.getDatabase("hr").drop().execute()
  }

  def updateEmployeeInfo(
      database: MongoDatabase,
      observable: SingleObservable[ClientSession]
  ): SingleObservable[ClientSession] = {
    observable.map(clientSession => {
      val employeesCollection = database.getCollection("employees")
      val eventsCollection = database.getCollection("events")

      val transactionOptions = TransactionOptions
        .builder()
        .readPreference(ReadPreference.primary())
        .readConcern(ReadConcern.SNAPSHOT)
        .writeConcern(WriteConcern.MAJORITY)
        .build()
      clientSession.startTransaction(transactionOptions)
      employeesCollection
        .updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
        .subscribe((res: UpdateResult) => println(res))
      eventsCollection
        .insertOne(
          clientSession,
          Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active"))
        )
        .subscribe((res: InsertOneResult) => ())

      clientSession
    })
  }

  def commitAndRetry(observable: SingleObservable[Void]): SingleObservable[Void] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
        println("UnknownTransactionCommitResult, retrying commit operation ...")
        commitAndRetry(observable)
      }
      case e: Exception => {
        println(s"Exception during commit ...: $e")
        throw e
      }
    })
  }

  def runTransactionAndRetry(observable: SingleObservable[Void]): SingleObservable[Void] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
        println("TransientTransactionError, aborting transaction and retrying ...")
        runTransactionAndRetry(observable)
      }
    })
  }

  def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Void] = {

    val database = client.getDatabase("hr")
    val updateEmployeeInfoObservable: SingleObservable[ClientSession] =
      updateEmployeeInfo(database, client.startSession())
    val commitTransactionObservable: SingleObservable[Void] =
      updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
    val commitAndRetryObservable: SingleObservable[Void] = commitAndRetry(commitTransactionObservable)

    runTransactionAndRetry(commitAndRetryObservable)
  }
}
// CommitWithRetry is an example function demonstrating transaction commit with retry logic.
func CommitWithRetry(sctx mongo.SessionContext) error {
	for {
		err := sctx.CommitTransaction(sctx)
		switch e := err.(type) {
		case nil:
			log.Println("Transaction committed.")
			return nil
		case mongo.CommandError:
			// Can retry commit
			if e.HasErrorLabel("UnknownTransactionCommitResult") {
				log.Println("UnknownTransactionCommitResult, retrying commit operation...")
				continue
			}
			log.Println("Error during commit...")
			return e
		default:
			log.Println("Error during commit...")
			return e
		}
	}
}

Retry Transaction and Commit Operation

Incorporating logic to retrying the transaction for transient errors and retrying the commit, the full code example is:

// Runs the txnFunc and retries if TransientTransactionError encountered

function runTransactionWithRetry(txnFunc, session) {
    while (true) {
        try {
            txnFunc(session);  // performs transaction
            break;
        } catch (error) {
            // If transient error, retry the whole transaction
            if ( error.hasOwnProperty("errorLabels") && error.errorLabels.includes("TransientTransactionError")  ) {
                print("TransientTransactionError, retrying transaction ...");
                continue;
            } else {
                throw error;
            }
        }
    }
}

// Retries commit if UnknownTransactionCommitResult encountered

function commitWithRetry(session) {
    while (true) {
        try {
            session.commitTransaction(); // Uses write concern set at transaction start.
            print("Transaction committed.");
            break;
        } catch (error) {
            // Can retry commit
            if (error.hasOwnProperty("errorLabels") && error.errorLabels.includes("UnknownTransactionCommitResult") ) {
                print("UnknownTransactionCommitResult, retrying commit operation ...");
                continue;
            } else {
                print("Error during commit ...");
                throw error;
            }
       }
    }
}

// Updates two collections in a transactions

function updateEmployeeInfo(session) {
    employeesCollection = session.getDatabase("hr").employees;
    eventsCollection = session.getDatabase("reporting").events;

    session.startTransaction( { readConcern: { level: "snapshot" }, writeConcern: { w: "majority" } } );

    try{
        employeesCollection.updateOne( { employee: 3 }, { $set: { status: "Inactive" } } );
        eventsCollection.insertOne( { employee: 3, status: { new: "Inactive", old: "Active" } } );
    } catch (error) {
        print("Caught exception during transaction, aborting.");
        session.abortTransaction();
        throw error;
    }

    commitWithRetry(session);
}

// Start a session.
session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );

try{
   runTransactionWithRetry(updateEmployeeInfo, session);
} catch (error) {
   // Do something with error
} finally {
   session.endSession();
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.


def run_transaction_with_retry(txn_func, session):
    while True:
        try:
            txn_func(session)  # performs transaction
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # If transient error, retry the whole transaction
            if exc.has_error_label("TransientTransactionError"):
                print("TransientTransactionError, retrying "
                      "transaction ...")
                continue
            else:
                raise

def commit_with_retry(session):
    while True:
        try:
            # Commit uses write concern set at transaction start.
            session.commit_transaction()
            print("Transaction committed.")
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # Can retry commit
            if exc.has_error_label("UnknownTransactionCommitResult"):
                print("UnknownTransactionCommitResult, retrying "
                      "commit operation ...")
                continue
            else:
                print("Error during commit ...")
                raise

# Updates two collections in a transactions

def update_employee_info(session):
    employees_coll = session.client.hr.employees
    events_coll = session.client.reporting.events

    with session.start_transaction(
            read_concern=ReadConcern("snapshot"),
            write_concern=WriteConcern(w="majority")):
        employees_coll.update_one(
            {"employee": 3}, {"$set": {"status": "Inactive"}},
            session=session)
        events_coll.insert_one(
            {"employee": 3, "status": {
                "new": "Inactive", "old": "Active"}},
            session=session)

        commit_with_retry(session)

# Start a session.
with client.start_session() as session:
    try:
        run_transaction_with_retry(update_employee_info, session)
    except Exception as exc:
        # Do something with error.
        raise

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

void runTransactionWithRetry(Runnable transactional) {
    while (true) {
        try {
            transactional.run();
            break;
        } catch (MongoException e) {
            System.out.println("Transaction aborted. Caught exception during transaction.");

            if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
                System.out.println("TransientTransactionError, aborting transaction and retrying ...");
                continue;
            } else {
                throw e;
            }
        }
    }
}

void commitWithRetry(ClientSession clientSession) {
    while (true) {
        try {
            clientSession.commitTransaction();
            System.out.println("Transaction committed");
            break;
        } catch (MongoException e) {
            // can retry commit
            if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
                System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");
                continue;
            } else {
                System.out.println("Exception during commit ...");
                throw e;
            }
        }
    }
}

void updateEmployeeInfo() {

    MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees");
    MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events");

    TransactionOptions txnOptions = TransactionOptions.builder()
            .readPreference(ReadPreference.primary())
            .readConcern(ReadConcern.MAJORITY)
            .writeConcern(WriteConcern.MAJORITY)
            .build();

    try (ClientSession clientSession = client.startSession()) {
        clientSession.startTransaction(txnOptions);

        employeesCollection.updateOne(clientSession,
                Filters.eq("employee", 3),
                Updates.set("status", "Inactive"));
        eventsCollection.insertOne(clientSession,
                new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active")));

        commitWithRetry(clientSession);
    }
}


void updateEmployeeInfoWithRetry() {
    runTransactionWithRetry(this::updateEmployeeInfo);
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

async function commitWithRetry(session) {
  try {
    await session.commitTransaction();
    console.log('Transaction committed.');
  } catch (error) {
    if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
      console.log('UnknownTransactionCommitResult, retrying commit operation ...');
      await commitWithRetry(session);
    } else {
      console.log('Error during commit ...');
      throw error;
    }
  }
}

async function runTransactionWithRetry(txnFunc, client, session) {
  try {
    await txnFunc(client, session);
  } catch (error) {
    console.log('Transaction aborted. Caught exception during transaction.');

    // If transient error, retry the whole transaction
    if (error.hasErrorLabel('TransientTransactionError')) {
      console.log('TransientTransactionError, retrying transaction ...');
      await runTransactionWithRetry(txnFunc, client, session);
    } else {
      throw error;
    }
  }
}

async function updateEmployeeInfo(client, session) {
  session.startTransaction({
    readConcern: { level: 'snapshot' },
    writeConcern: { w: 'majority' },
    readPreference: 'primary'
  });

  const employeesCollection = client.db('hr').collection('employees');
  const eventsCollection = client.db('reporting').collection('events');

  await employeesCollection.updateOne(
    { employee: 3 },
    { $set: { status: 'Inactive' } },
    { session }
  );
  await eventsCollection.insertOne(
    {
      employee: 3,
      status: { new: 'Inactive', old: 'Active' }
    },
    { session }
  );

  try {
    await commitWithRetry(session);
  } catch (error) {
    await session.abortTransaction();
    throw error;
  }
}

return client.withSession(session =>
  runTransactionWithRetry(updateEmployeeInfo, client, session)
);

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session)
{
    while (true) {
        try {
            $txnFunc($client, $session);  // performs transaction
            break;
        } catch (\MongoDB\Driver\Exception\CommandException $error) {
            $resultDoc = $error->getResultDocument();

            // If transient error, retry the whole transaction
            if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) {
                continue;
            } else {
                throw $error;
            }
        } catch (\MongoDB\Driver\Exception\Exception $error) {
            throw $error;
        }
    }
}

private function commitWithRetry3(\MongoDB\Driver\Session $session)
{
    while (true) {
        try {
            $session->commitTransaction();
            echo "Transaction committed.\n";
            break;
        } catch (\MongoDB\Driver\Exception\CommandException $error) {
            $resultDoc = $error->getResultDocument();

            if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) {
                echo "UnknownTransactionCommitResult, retrying commit operation ...\n";
                continue;
            } else {
                echo "Error during commit ...\n";
                throw $error;
            }
        } catch (\MongoDB\Driver\Exception\Exception $error) {
            echo "Error during commit ...\n";
            throw $error;
        }
    }
}

private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session)
{
    $session->startTransaction([
        'readConcern' => new \MongoDB\Driver\ReadConcern("snapshot"),
        'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY)
    ]);

    try {
        $client->hr->employees->updateOne(
            ['employee' => 3],
            ['$set' => ['status' => 'Inactive']],
            ['session' => $session]
        );
        $client->reporting->events->insertOne(
            ['employee' => 3, 'status' => [ 'new' => 'Inactive', 'old' => 'Active']],
            ['session' => $session]
        );
    } catch (\MongoDB\Driver\Exception\Exception $error) {
        echo "Caught exception during transaction, aborting.\n";
        $session->abortTransaction();
        throw $error;
    }

    $this->commitWithRetry3($session);
}

private function doUpdateEmployeeInfo(\MongoDB\Client $client)
{
    // Start a session.
    $session = $client->startSession();

    try {
        $this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session);
    } catch (\MongoDB\Driver\Exception\Exception $error) {
        // Do something with error
    }
}
/* takes a session, an out-param for server reply, and out-param for error. */
typedef bool (*txn_func_t) (mongoc_client_session_t *,
                            bson_t *,
                            bson_error_t *);


/* runs transactions with retry logic */
bool
run_transaction_with_retry (txn_func_t txn_func,
                            mongoc_client_session_t *cs,
                            bson_error_t *error)
{
   bson_t reply;
   bool r;

   while (true) {
      /* perform transaction */
      r = txn_func (cs, &reply, error);
      if (r) {
         /* success */
         bson_destroy (&reply);
         return true;
      }

      MONGOC_WARNING ("Transaction aborted: %s", error->message);
      if (mongoc_error_has_label (&reply, "TransientTransactionError")) {
         /* on transient error, retry the whole transaction */
         MONGOC_WARNING ("TransientTransactionError, retrying transaction...");
         bson_destroy (&reply);
      } else {
         /* non-transient error */
         break;
      }
   }

   bson_destroy (&reply);
   return false;
}


/* commit transactions with retry logic */
bool
commit_with_retry (mongoc_client_session_t *cs, bson_error_t *error)
{
   bson_t reply;
   bool r;

   while (true) {
      /* commit uses write concern set at transaction start, see
       * mongoc_transaction_opts_set_write_concern */
      r = mongoc_client_session_commit_transaction (cs, &reply, error);
      if (r) {
         MONGOC_INFO ("Transaction committed");
         break;
      }

      if (mongoc_error_has_label (&reply, "UnknownTransactionCommitResult")) {
         MONGOC_WARNING ("UnknownTransactionCommitResult, retrying commit ...");
         bson_destroy (&reply);
      } else {
         /* commit failed, cannot retry */
         break;
      }
   }

   bson_destroy (&reply);

   return r;
}


/* updates two collections in a transaction and calls commit_with_retry */
bool
update_employee_info (mongoc_client_session_t *cs,
                      bson_t *reply,
                      bson_error_t *error)
{
   mongoc_client_t *client;
   mongoc_collection_t *employees;
   mongoc_collection_t *events;
   mongoc_read_concern_t *rc;
   mongoc_write_concern_t *wc;
   mongoc_transaction_opt_t *txn_opts;
   bson_t opts = BSON_INITIALIZER;
   bson_t *filter = NULL;
   bson_t *update = NULL;
   bson_t *event = NULL;
   bool r;

   bson_init (reply);

   client = mongoc_client_session_get_client (cs);
   employees = mongoc_client_get_collection (client, "hr", "employees");
   events = mongoc_client_get_collection (client, "reporting", "events");

   rc = mongoc_read_concern_new ();
   mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
   wc = mongoc_write_concern_new ();
   mongoc_write_concern_set_w (wc, MONGOC_WRITE_CONCERN_W_MAJORITY);
   txn_opts = mongoc_transaction_opts_new ();
   mongoc_transaction_opts_set_read_concern (txn_opts, rc);
   mongoc_transaction_opts_set_write_concern (txn_opts, wc);

   r = mongoc_client_session_start_transaction (cs, txn_opts, error);
   if (!r) {
      goto done;
   }

   r = mongoc_client_session_append (cs, &opts, error);
   if (!r) {
      goto done;
   }

   filter = BCON_NEW ("employee", BCON_INT32 (3));
   update = BCON_NEW ("$set", "{", "status", "Inactive", "}");
   /* mongoc_collection_update_one will reinitialize reply */
   bson_destroy (reply);
   r = mongoc_collection_update_one (
      employees, filter, update, &opts, reply, error);

   if (!r) {
      goto abort;
   }

   event = BCON_NEW ("employee", BCON_INT32 (3));
   BCON_APPEND (event, "status", "{", "new", "Inactive", "old", "Active", "}");

   bson_destroy (reply);
   r = mongoc_collection_insert_one (events, event, &opts, reply, error);
   if (!r) {
      goto abort;
   }

   r = commit_with_retry (cs, error);

abort:
   if (!r) {
      MONGOC_ERROR ("Aborting due to error in transaction: %s", error->message);
      mongoc_client_session_abort_transaction (cs, NULL);
   }

done:
   mongoc_collection_destroy (employees);
   mongoc_collection_destroy (events);
   mongoc_read_concern_destroy (rc);
   mongoc_write_concern_destroy (wc);
   mongoc_transaction_opts_destroy (txn_opts);
   bson_destroy (&opts);
   bson_destroy (filter);
   bson_destroy (update);
   bson_destroy (event);

   return r;
}


void
example_func (mongoc_client_t *client)
{
   mongoc_client_session_t *cs;
   bson_error_t error;
   bool r;

   cs = mongoc_client_start_session (client, NULL, &error);
   if (!cs) {
      MONGOC_ERROR ("Could not start session: %s", error.message);
      return;
   }

   r = run_transaction_with_retry (update_employee_info, cs, &error);
   if (!r) {
      MONGOC_ERROR ("Could not update employee, permanent error: %s",
                    error.message);
   }

   mongoc_client_session_destroy (cs);
}
using transaction_func = std::function<void(client_session & session)>;
auto run_transaction_with_retry = [](transaction_func txn_func, client_session& session) {
    while (true) {
        try {
            txn_func(session);  // performs transaction.
            break;
        } catch (const operation_exception& oe) {
            std::cout << "Transaction aborted. Caught exception during transaction."
                      << std::endl;
            // If transient error, retry the whole transaction.
            if (oe.has_error_label("TransientTransactionError")) {
                std::cout << "TransientTransactionError, retrying transaction ..."
                          << std::endl;
                continue;
            } else {
                throw oe;
            }
        }
    }
};

auto commit_with_retry = [](client_session& session) {
    while (true) {
        try {
            session.commit_transaction();  // Uses write concern set at transaction start.
            std::cout << "Transaction committed." << std::endl;
            break;
        } catch (const operation_exception& oe) {
            // Can retry commit
            if (oe.has_error_label("UnknownTransactionCommitResult")) {
                std::cout << "UnknownTransactionCommitResult, retrying commit operation ..."
                          << std::endl;
                continue;
            } else {
                std::cout << "Error during commit ..." << std::endl;
                throw oe;
            }
        }
    }
};

// Updates two collections in a transaction
auto update_employee_info = [&](client_session& session) {
    auto& client = session.client();
    auto employees = client["hr"]["employees"];
    auto events = client["reporting"]["events"];

    options::transaction txn_opts;
    read_concern rc;
    rc.acknowledge_level(read_concern::level::k_snapshot);
    txn_opts.read_concern(rc);
    write_concern wc;
    wc.acknowledge_level(write_concern::level::k_majority);
    txn_opts.write_concern(wc);

    session.start_transaction(txn_opts);

    try {
        employees.update_one(
            make_document(kvp("employee", 3)),
            make_document(kvp("$set", make_document(kvp("status", "Inactive")))));
        events.insert_one(make_document(
            kvp("employee", 3),
            kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active")))));
    } catch (const operation_exception& oe) {
        std::cout << "Caught exception during transaction, aborting." << std::endl;
        session.abort_transaction();
        throw oe;
    }

    commit_with_retry(session);
};

auto session = client.start_session();
try {
    run_transaction_with_retry(update_employee_info, session);
} catch (const operation_exception& oe) {
    // Do something with error.
    throw oe;
}
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session)
{
    while (true)
    {
        try
        {
            txnFunc(client, session); // performs transaction
            break;
        }
        catch (MongoException exception)
        {
            // if transient error, retry the whole transaction
            if (exception.HasErrorLabel("TransientTransactionError"))
            {
                Console.WriteLine("TransientTransactionError, retrying transaction.");
                continue;
            }
            else
            {
                throw;
            }
        }
    }
}

public void CommitWithRetry(IClientSessionHandle session)
{
    while (true)
    {
        try
        {
            session.CommitTransaction();
            Console.WriteLine("Transaction committed.");
            break;
        }
        catch (MongoException exception)
        {
            // can retry commit
            if (exception.HasErrorLabel("UnknownTransactionCommitResult"))
            {
                Console.WriteLine("UnknwonTransactionCommiResult, retrying commit operation");
                continue;
            }
            else
            {
                Console.WriteLine($"Error during commit: {exception.Message}.");
                throw;
            }
        }
    }
}

// updates two collections in a transaction
public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session)
{
    var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees");
    var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events");

    session.StartTransaction(new TransactionOptions(
        readConcern: ReadConcern.Snapshot,
        writeConcern: WriteConcern.WMajority));

    try
    {
        employeesCollection.UpdateOne(
            Builders<BsonDocument>.Filter.Eq("employee", 3),
            Builders<BsonDocument>.Update.Set("status", "Inactive"));
        eventsCollection.InsertOne(
            new BsonDocument
            {
                { "employee", 3 },
                { "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } }
            });
    }
    catch (Exception exception)
    {
        Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}.");
        session.AbortTransaction();
        throw;
    }

    CommitWithRetry(session);
}

public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client)
{
    // start a session
    using (var session = client.StartSession())
    {
        try
        {
            RunTransactionWithRetry(UpdateEmployeeInfo, client, session);
        }
        catch (Exception exception)
        {
            // do something with error
            Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}.");
        }
    }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

sub runTransactionWithRetry {
    my ( $txnFunc, $session ) = @_;

    LOOP: {
        eval {
            $txnFunc->($session); # performs transaction
        };
        if ( my $error = $@ ) {
            print("Transaction aborted-> Caught exception during transaction.\n");
            # If transient error, retry the whole transaction
            if ( $error->has_error_label("TransientTransactionError") ) {
                print("TransientTransactionError, retrying transaction ->..\n");
                redo LOOP;
            }
            else {
                die $error;
            }
        }
    }

    return;
}

sub commitWithRetry {
    my ($session) = @_;

    LOOP: {
        eval {
            $session->commit_transaction(); # Uses write concern set at transaction start.
            print("Transaction committed->\n");
        };
        if ( my $error = $@ ) {
            # Can retry commit
            if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
                print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
                redo LOOP;
            }
            else {
                print("Error during commit ->..\n");
                die $error;
            }
        }
    }

    return;
}

# Updates two collections in a transactions

sub updateEmployeeInfo {
    my ($session) = @_;
    my $employeesCollection = $session->client->ns("hr.employees");
    my $eventsCollection    = $session->client->ns("reporting.events");

    $session->start_transaction(
        {
            readConcern  => { level => "snapshot" },
            writeConcern => { w     => "majority" },
        }
    );

    eval {
        $employeesCollection->update_one(
            { employee => 3 }, { '$set' => { status => "Inactive" } },
            { session => $session},
        );
        $eventsCollection->insert_one(
            { employee => 3, status => { new => "Inactive", old => "Active" } },
            { session => $session},
        );
    };
    if ( my $error = $@ ) {
        print("Caught exception during transaction, aborting->\n");
        $session->abort_transaction();
        die $error;
    }

    commitWithRetry($session);
}

# Start a session
my $session = $client->start_session();

eval {
    runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
    # Do something with error
}

$session->end_session();

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

def run_transaction_with_retry(session)
  begin
    yield session # performs transaction
  rescue Mongo::Error => e
    puts 'Transaction aborted. Caught exception during transaction.'
    raise unless e.label?(Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL)
    puts "#{Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL}, retrying transaction ..."
    retry
  end
end

def commit_with_retry(session)
  begin
    session.commit_transaction
    puts 'Transaction committed.'
  rescue Mongo::Error => e
    if e.label?(Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)
      puts "#{Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL}, retrying commit operation ..."
      retry
    else
      puts 'Error during commit ...'
      raise
    end
  end
end

# updates two collections in a transaction

def update_employee_info(session)
  employees_coll = session.client.use(:hr)[:employees]
  events_coll = session.client.use(:reporting)[:events]

  session.start_transaction(read_concern: { level: :snapshot },
                            write_concern: { w: :majority })
  employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} },
                            session: session)
  events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } },
                         session: session)
  commit_with_retry(session)
end

session = client.start_session

begin
  run_transaction_with_retry(session) { |s| update_employee_info(s) }
rescue StandardError => e
  # Do something with error
  raise
end

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

/*
 * Copyright 2008-present MongoDB, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.mongodb.scala.documentation

import org.mongodb.scala.TestMongoClientHelper.hasSingleHost
import org.mongodb.scala._
import org.mongodb.scala.model.{ Filters, Updates }
import org.mongodb.scala.result.{ InsertOneResult, UpdateResult }

import scala.concurrent.Await

//scalastyle:off magic.number regex
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {

  // Implicit functions that execute the Observable and return the results
  implicit class ObservableExecutor[T](observable: Observable[T]) {
    def execute(): Seq[T] = Await.result(observable.toFuture(), WAIT_DURATION)
  }

  implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
    def execute(): T = Await.result(observable.toFuture(), WAIT_DURATION)
  }
  // end implicit functions

  "The Scala driver" should "be able to commit a transaction" in withClient { client =>
    assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost)
    client.getDatabase("hr").drop().execute()
    client.getDatabase("hr").createCollection("employees").execute()
    client.getDatabase("hr").createCollection("events").execute()

    updateEmployeeInfoWithRetry(client).execute()
    client.getDatabase("hr").drop().execute()
  }

  def updateEmployeeInfo(
      database: MongoDatabase,
      observable: SingleObservable[ClientSession]
  ): SingleObservable[ClientSession] = {
    observable.map(clientSession => {
      val employeesCollection = database.getCollection("employees")
      val eventsCollection = database.getCollection("events")

      val transactionOptions = TransactionOptions
        .builder()
        .readPreference(ReadPreference.primary())
        .readConcern(ReadConcern.SNAPSHOT)
        .writeConcern(WriteConcern.MAJORITY)
        .build()
      clientSession.startTransaction(transactionOptions)
      employeesCollection
        .updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
        .subscribe((res: UpdateResult) => println(res))
      eventsCollection
        .insertOne(
          clientSession,
          Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active"))
        )
        .subscribe((res: InsertOneResult) => ())

      clientSession
    })
  }

  def commitAndRetry(observable: SingleObservable[Void]): SingleObservable[Void] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
        println("UnknownTransactionCommitResult, retrying commit operation ...")
        commitAndRetry(observable)
      }
      case e: Exception => {
        println(s"Exception during commit ...: $e")
        throw e
      }
    })
  }

  def runTransactionAndRetry(observable: SingleObservable[Void]): SingleObservable[Void] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
        println("TransientTransactionError, aborting transaction and retrying ...")
        runTransactionAndRetry(observable)
      }
    })
  }

  def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Void] = {

    val database = client.getDatabase("hr")
    val updateEmployeeInfoObservable: SingleObservable[ClientSession] =
      updateEmployeeInfo(database, client.startSession())
    val commitTransactionObservable: SingleObservable[Void] =
      updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
    val commitAndRetryObservable: SingleObservable[Void] = commitAndRetry(commitTransactionObservable)

    runTransactionAndRetry(commitAndRetryObservable)
  }
}
	runTransactionWithRetry := func(sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error {
		for {
			err := txnFn(sctx) // Performs transaction.
			if err == nil {
				return nil
			}

			log.Println("Transaction aborted. Caught exception during transaction.")

			// If transient error, retry the whole transaction
			if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
				log.Println("TransientTransactionError, retrying transaction...")
				continue
			}
			return err
		}
	}

	commitWithRetry := func(sctx mongo.SessionContext) error {
		for {
			err := sctx.CommitTransaction(sctx)
			switch e := err.(type) {
			case nil:
				log.Println("Transaction committed.")
				return nil
			case mongo.CommandError:
				// Can retry commit
				if e.HasErrorLabel("UnknownTransactionCommitResult") {
					log.Println("UnknownTransactionCommitResult, retrying commit operation...")
					continue
				}
				log.Println("Error during commit...")
				return e
			default:
				log.Println("Error during commit...")
				return e
			}
		}
	}

	// Updates two collections in a transaction.
	updateEmployeeInfo := func(sctx mongo.SessionContext) error {
		employees := client.Database("hr").Collection("employees")
		events := client.Database("reporting").Collection("events")

		err := sctx.StartTransaction(options.Transaction().
			SetReadConcern(readconcern.Snapshot()).
			SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
		)
		if err != nil {
			return err
		}

		_, err = employees.UpdateOne(sctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}})
		if err != nil {
			sctx.AbortTransaction(sctx)
			log.Println("caught exception during transaction, aborting.")
			return err
		}
		_, err = events.InsertOne(sctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}})
		if err != nil {
			sctx.AbortTransaction(sctx)
			log.Println("caught exception during transaction, aborting.")
			return err
		}

		return commitWithRetry(sctx)
	}

	return client.UseSessionWithOptions(
		ctx, options.Session().SetDefaultReadPreference(readpref.Primary()),
		func(sctx mongo.SessionContext) error {
			return runTransactionWithRetry(sctx, updateEmployeeInfo)
		},
	)
}

Read Concern/Write Concern/Read Preference

Transactions and Read Concern

Operations in a transaction use the transaction-level read concern. That is, any read concern set at the collection and database level is ignored inside the transaction.

You can set the transaction-level read concern at the transaction start.

  • If the transaction-level read concern is unset, the transaction-level read concern defaults to the session-level read concern.
  • If transaction-level and the session-level read concern are unset, the transaction-level read concern defaults to the client-level read concern. By default, client-level read concern is "local" for reads against the primary. See also Transactions and Read Preference.

Multi-document transactions support the following read concern levels:

"local"

  • Read concern "local" returns the most recent data available from the node but can be rolled back.

"majority"

[1]For a three-member Primary-Secondary-Arbiter (PSA) replica set architecture, you may have disabled read concern “majority” to avoid cache pressure. Disabling "majority" does not affect transactions; i.e. you can specify read concern "majority" for transactions even if read concern "majority" is disabled.

"snapshot"

Transactions and Write Concern

Transactions use the transaction-level write concern to commit the write operations. Operations inside transactions ignore write concerns.

Tip

Do not explicitly set the write concern for the individual write operations inside a transaction.

You can set the transaction-level write concern at the transaction start:

  • If the transaction-level write concern is unset, the transaction-level write concern defaults to the session-level write concern for the commit.
  • If the transaction-level write concern and the session-level write concern are unset, transaction-level write concern defaults to the client-level write concern. By default, client-level write concern is w: 1.

Multi-document transactions support the following write concern w values:

w: 1

  • Write concern w: 1 returns acknowledgement after the commit has been applied to the primary.

    Important

    When you commit with w: 1, your transaction can be rolled back if there is a failover.

  • When you commit with w: 1 write concern, transaction-level "majority" read concern provides no guarantees that read operations in the transaction read majority-committed data.

  • When you commit with w: 1 write concern, transaction-level "snapshot" read concern provides no guarantee that read operations in the transaction used a snapshot of majority-committed data.

w: "majority"

  • Write concern w: "majority" returns acknowledgement after the commit has been applied to a majority (M) of voting members; i.e. the commit has been applied to the primary and (M-1) voting secondaries.
  • When you commit with w: "majority" write concern, transaction-level "majority" read concern guarantees that operations have read majority-committed data.
  • When you commit with w: "majority" write concern, transaction-level "snapshot" read concern guarantees that operations have from a synchronized snapshot of majority-committed data.

Transactions and Read Preference

Operations in a transaction use the transaction-level read preference.

Using the drivers, you can set the transaction-level read preference at the transaction start:

  • If the transaction-level read preference is unset, the transaction uses the session-level read preference.
  • If transaction-level and the session-level read preference are unset, the transaction uses the client-level read preference. By default, the client-level read preference is primary.

Multi-document transactions that contain read operations must use read preference primary.

All operations in a given transaction must route to the same member.

General Information

Feature Compatibility Version (FCV)

The featureCompatibilityVersion (fCV) of all members of the replica set must be 4.0 or greater. To check the fCV for a member, connect to the member and run the following command:

db.adminCommand( { getParameter: 1, featureCompatibilityVersion: 1 } )

For more information on fCV, see setFeatureCompatibilityVersion.

Storage Engines

Multi-document transactions are only available for deployments that use WiredTiger storage engine.

Multi-document transactions are not available for deployments that use in-memory storage engine or the deprecated MMAPv1 storage engine.

Diagnostics

Transactions and Security

[2]If using $external authentication users (i.e. Kerberos, LDAP, x.509 users), the usernames cannot be greater than 10k bytes.