Navigation
This is an upcoming (i.e. in progress) version of the manual.

Transactions in Applications

Highly Available Applications

Regardless of the database system, whether MongoDB or relational databases, applications should take measures to handle errors during transaction commits and incorporate retry logic for transactions.

"TransientTransactionError"

The individual write operations inside the transaction are not retryable, regardless of whether retryWrites is set to true. If an operation encounters an error associated with the label "TransientTransactionError", the transaction as a whole can be retried.

  • The callback API incorporates retry logic for "TransientTransactionError".
  • The core transaction API does not incorporate retry logic for "TransientTransactionError". To handle "TransientTransactionError", applications should explicitly incorporate retry logic for the error.

"UnknownTransactionCommitResult"

The commit operations are retryable write operations. If the commit operation encounters an error, MongoDB drivers retry the commit regardless of whether retryWrites is set to true.

If the commit operation encounters an error labeled "UnknownTransactionCommitResult", the commit can be retried.

  • The callback API incorporates retry logic for "UnknownTransactionCommitResult".
  • The core transaction API does not incorporate retry logic for "UnknownTransactionCommitResult". To handle "UnknownTransactionCommitResult", applications should explicitly incorporate retry logic for the error.

Callback API

The new callback API incorporates logic:

  • To retry the transaction as a whole if the transaction encounters a "TransientTransactionError".
  • To retry the commit operation if the commit encounters an "UnknownTransactionCommitResult".

Note

The example uses the new callback API for working with transactions. The new callback API incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/admin'

client = MongoClient(uriString)

my_write_concern_majority = WriteConcern('majority', wtimeout=1000)

# Prereq: Create collections. CRUD operations in transactions must be on existing collections.

client.get_database(
    'mydb1',
    write_concern=my_write_concern_majority).foo.insert_one({'abc': 0})
client.get_database(
    'mydb2',
    write_concern=my_write_concern_majority).bar.insert_one({'xyz': 0})

def callback(my_session):
    collection_one = my_session.client.mydb1.foo
    collection_two = my_session.client.mydb2.bar
    collection_one.insert_one({'abc': 1}, session=my_session)
    collection_two.insert_one({'xyz': 999}, session=my_session)

with client.start_session() as session:
    session.with_transaction(callback,
                             read_concern=ReadConcern('local'),
                             write_concern=my_write_concern_majority,
                             read_preference=ReadPreference.PRIMARY)

Note

The example uses the new callback API for working with transactions. The new callback API incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.

/*
  For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
  String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
  For a sharded cluster, connect to the mongos instances; e.g.
  String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
 */

final MongoClient client = MongoClients.create(uri);

/*
    Prereq: Create collections. CRUD operations in transactions must be on existing collections.
 */

client.getDatabase("mydb1").getCollection("foo")
        .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
        .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("xyz", 0));

final ClientSession clientSession = client.startSession();

TransactionOptions txnOptions = TransactionOptions.builder()
        .readPreference(ReadPreference.primary())
        .readConcern(ReadConcern.LOCAL)
        .writeConcern(WriteConcern.MAJORITY)
        .build();

TransactionBody txnBody = new TransactionBody<String>() {
    public String execute() {
        MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
        MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");

        coll1.insertOne(clientSession, new Document("abc", 1));
        coll2.insertOne(clientSession, new Document("xyz", 999));

        return "Inserted into collections in different databases";
    }
};
try {
    clientSession.withTransaction(txnOptions, txnBody);
} catch (RuntimeException e) {
    // some error handling
} finally {
    clientSession.close();
}

Core API

The core transaction API does not incorporate retry logic for errors labeled:

  • "TransientTransactionError". To handle "TransientTransactionError", applications should explicitly incorporate retry logic for the error.
  • "UnknownTransactionCommitResult". To handle "UnknownTransactionCommitResult", applications should explicitly incorporate retry logic for the error.

Explicitly Handle TransientTransactionError

If an operation in a transaction returns an error labeled "TransientTransactionError", the transaction as a whole can be retried.

For example, the following code sample runs a function and retries the function if a "TransientTransactionError" is encountered:

// Runs the txnFunc and retries if TransientTransactionError encountered

function runTransactionWithRetry(txnFunc, session) {
    while (true) {
        try {
            txnFunc(session);  // performs transaction
            break;
        } catch (error) {
            print("Transaction aborted. Caught exception during transaction.");

            // If transient error, retry the whole transaction
            if ( error.hasOwnProperty("errorLabels") && error.errorLabels.includes( "TransientTransactionError")  ) {
                print("TransientTransactionError, retrying transaction ...");
                continue;
            } else {
                throw error;
            }
        }
    }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

def run_transaction_with_retry(txn_func, session):
    while True:
        try:
            txn_func(session)  # performs transaction
            break
        except (ConnectionFailure, OperationFailure) as exc:
            print("Transaction aborted. Caught exception during "
                  "transaction.")

            # If transient error, retry the whole transaction
            if exc.has_error_label("TransientTransactionError"):
                print("TransientTransactionError, retrying"
                      "transaction ...")
                continue
            else:
                raise

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

void runTransactionWithRetry(Runnable transactional) {
    while (true) {
        try {
            transactional.run();
            break;
        } catch (MongoException e) {
            System.out.println("Transaction aborted. Caught exception during transaction.");

            if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
                System.out.println("TransientTransactionError, aborting transaction and retrying ...");
                continue;
            } else {
                throw e;
            }
        }
    }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

async function runTransactionWithRetry(txnFunc, client, session) {
  try {
    await txnFunc(client, session);
  } catch (error) {
    console.log('Transaction aborted. Caught exception during transaction.');

    // If transient error, retry the whole transaction
    if (error.errorLabels && error.errorLabels.indexOf('TransientTransactionError') >= 0) {
      console.log('TransientTransactionError, retrying transaction ...');
      await runTransactionWithRetry(txnFunc, client, session);
    } else {
      throw error;
    }
  }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

private function runTransactionWithRetry1(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session)
{
    while (true) {
        try {
            $txnFunc($client, $session);  // performs transaction
            break;
        } catch (\MongoDB\Driver\Exception\CommandException $error) {
            $resultDoc = $error->getResultDocument();
            echo "Transaction aborted. Caught exception during transaction.\n";

            // If transient error, retry the whole transaction
            if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) {
                echo "TransientTransactionError, retrying transaction ...\n";
                continue;
            } else {
                throw $error;
            }
        } catch (\MongoDB\Driver\Exception\Exception $error) {
            throw $error;
        }
    }
}
bool
run_transaction_with_retry (txn_func_t txn_func,
                            mongoc_client_session_t *cs,
                            bson_error_t *error)
{
   bson_t reply;
   bool r;

   while (true) {
      /* perform transaction */
      r = txn_func (cs, &reply, error);
      if (r) {
         /* success */
         bson_destroy (&reply);
         return true;
      }

      MONGOC_WARNING ("Transaction aborted: %s", error->message);
      if (mongoc_error_has_label (&reply, "TransientTransactionError")) {
         /* on transient error, retry the whole transaction */
         MONGOC_WARNING ("TransientTransactionError, retrying transaction...");
         bson_destroy (&reply);
      } else {
         /* non-transient error */
         break;
      }
   }

   bson_destroy (&reply);
   return false;
}


using transaction_func = std::function<void(client_session & session)>;
auto run_transaction_with_retry = [](transaction_func txn_func, client_session& session) {
    while (true) {
        try {
            txn_func(session);  // performs transaction.
            break;
        } catch (const operation_exception& oe) {
            std::cout << "Transaction aborted. Caught exception during transaction."
                      << std::endl;
            // If transient error, retry the whole transaction.
            if (oe.has_error_label("TransientTransactionError")) {
                std::cout << "TransientTransactionError, retrying transaction ..."
                          << std::endl;
                continue;
            } else {
                throw oe;
            }
        }
    }
};
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session)
{
    while (true)
    {
        try
        {
            txnFunc(client, session); // performs transaction
            break;
        }
        catch (MongoException exception)
        {
            Console.WriteLine($"Transaction aborted. Caught exception during transaction: ${exception.Message}.");

            // if transient error, retry the whole transaction
            if (exception.HasErrorLabel("TransientTransactionError"))
            {
                Console.WriteLine("TransientTransactionError, retrying transaction.");
                continue;
            }
            else
            {
                throw;
            }  
        }
    }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

sub runTransactionWithRetry {
    my ( $txnFunc, $session ) = @_;

    LOOP: {
        eval {
            $txnFunc->($session); # performs transaction
        };
        if ( my $error = $@ ) {
            print("Transaction aborted-> Caught exception during transaction.\n");
            # If transient error, retry the whole transaction
            if ( $error->has_error_label("TransientTransactionError") ) {
                print("TransientTransactionError, retrying transaction ->..\n");
                redo LOOP;
            }
            else {
                die $error;
            }
        }
    }

    return;
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

def run_transaction_with_retry(session)
  begin
    yield session # performs transaction
  rescue Mongo::Error => e

    puts 'Transaction aborted. Caught exception during transaction.'
    raise unless e.label?(Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL)

    puts "#{Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL}, retrying transaction ..."
    retry
  end
end

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

  def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
        println("TransientTransactionError, aborting transaction and retrying ...")
        runTransactionAndRetry(observable)
      }
    })
  }
// RunTransactionWithRetry is an example function demonstrating transaction retry logic.
func RunTransactionWithRetry(sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error {
	for {
		err := txnFn(sctx) // Performs transaction.
		if err == nil {
			return nil
		}

		log.Println("Transaction aborted. Caught exception during transaction.")

		// If transient error, retry the whole transaction
		if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
			log.Println("TransientTransactionError, retrying transaction...")
			continue
		}
		return err
	}
}

Explicitly Handle UnknownTransactionCommitResult

If the commit returns an error labeled "UnknownTransactionCommitResult", the commit can be retried.

For example, the following code sample commits a transaction and retries the commit if a "UnknownTransactionCommitResult" error is encountered:

// Retries commit if UnknownTransactionCommitResult encountered

function commitWithRetry(session) {
    while (true) {
        try {
            session.commitTransaction(); // Uses write concern set at transaction start.
            print("Transaction committed.");
            break;
        } catch (error) {
            // Can retry commit
            if (error.hasOwnProperty("errorLabels") && error.errorLabels.includes( "UnknownTransactionCommitResult") ) {
                print("UnknownTransactionCommitResult, retrying commit operation ...");
                continue;
            } else {
                print("Error during commit ...");
                throw error;
            }
       }
    }
}
def commit_with_retry(session):
    while True:
        try:
            # Commit uses write concern set at transaction start.
            session.commit_transaction()
            print("Transaction committed.")
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # Can retry commit
            if exc.has_error_label("UnknownTransactionCommitResult"):
                print("UnknownTransactionCommitResult, retrying "
                      "commit operation ...")
                continue
            else:
                print("Error during commit ...")
                raise
void commitWithRetry(ClientSession clientSession) {
    while (true) {
        try {
            clientSession.commitTransaction();
            System.out.println("Transaction committed");
            break;
        } catch (MongoException e) {
            // can retry commit
            if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
                System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");
                continue;
            } else {
                System.out.println("Exception during commit ...");
                throw e;
            }
        }
    }
}
async function commitWithRetry(session) {
  try {
    await session.commitTransaction();
    console.log('Transaction committed.');
  } catch (error) {
    if (
      error.errorLabels &&
      error.errorLabels.indexOf('UnknownTransactionCommitResult') >= 0
    ) {
      console.log('UnknownTransactionCommitResult, retrying commit operation ...');
      await commitWithRetry(session);
    } else {
      console.log('Error during commit ...');
      throw error;
    }
  }
}
private function commitWithRetry2(\MongoDB\Driver\Session $session)
{
    while (true) {
        try {
            $session->commitTransaction();
            echo "Transaction committed.\n";
            break;
        } catch (\MongoDB\Driver\Exception\CommandException $error) {
            $resultDoc = $error->getResultDocument();

            if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) {
                echo "UnknownTransactionCommitResult, retrying commit operation ...\n";
                continue;
            } else {
                echo "Error during commit ...\n";
                throw $error;
            }
        } catch (\MongoDB\Driver\Exception\Exception $error) {
            echo "Error during commit ...\n";
            throw $error;
        }
    }
}
bool
commit_with_retry (mongoc_client_session_t *cs, bson_error_t *error)
{
   bson_t reply;
   bool r;

   while (true) {
      /* commit uses write concern set at transaction start, see
       * mongoc_transaction_opts_set_write_concern */
      r = mongoc_client_session_commit_transaction (cs, &reply, error);
      if (r) {
         MONGOC_INFO ("Transaction committed");
         break;
      }

      if (mongoc_error_has_label (&reply, "UnknownTransactionCommitResult")) {
         MONGOC_WARNING ("UnknownTransactionCommitResult, retrying commit ...");
         bson_destroy (&reply);
      } else {
         /* commit failed, cannot retry */
         break;
      }
   }

   bson_destroy (&reply);

   return r;
}


auto commit_with_retry = [](client_session& session) {
    while (true) {
        try {
            session.commit_transaction();  // Uses write concern set at transaction start.
            std::cout << "Transaction committed." << std::endl;
            break;
        } catch (const operation_exception& oe) {
            // Can retry commit
            if (oe.has_error_label("UnknownTransactionCommitResult")) {
                std::cout << "UnknownTransactionCommitResult, retrying commit operation ..."
                          << std::endl;
                continue;
            } else {
                std::cout << "Error during commit ..." << std::endl;
                throw oe;
            }
        }
    }
};
public void CommitWithRetry(IClientSessionHandle session)
{
    while (true)
    {
        try
        {
            session.CommitTransaction(); // uses write concern set at transaction start
            Console.WriteLine("Transaction committed.");
            break;
        }
        catch (MongoException exception)
        {
            // can retry commit
            if (exception.HasErrorLabel("UnknownTransactionCommitResult"))
            {
                Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation.");
                continue;
            }
            else
            {
                Console.WriteLine($"Excpetion during commit: {exception.Message}.");
                throw;
            }
        }
    }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

sub commitWithRetry {
    my ($session) = @_;

    LOOP: {
        eval {
            $session->commit_transaction(); # Uses write concern set at transaction start.
            print("Transaction committed->\n");
        };
        if ( my $error = $@ ) {
            # Can retry commit
            if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
                print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
                redo LOOP;
            }
            else {
                print("Error during commit ->..\n");
                die $error;
            }
        }
    }

    return;
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

def commit_with_retry(session)
  begin
    session.commit_transaction
    puts 'Transaction committed.'
  rescue Mongo::Error=> e
    if e.label?(Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)
      puts "#{Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL}, retrying commit operation..."
      retry
    else
      puts 'Error during commit ...'
      raise
    end
  end
end

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

  def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
        println("UnknownTransactionCommitResult, retrying commit operation ...")
        commitAndRetry(observable)
      }
      case e: Exception => {
        println(s"Exception during commit ...: $e")
        throw e
      }
    })
  }
// CommitWithRetry is an example function demonstrating transaction commit with retry logic.
func CommitWithRetry(sctx mongo.SessionContext) error {
	for {
		err := sctx.CommitTransaction(sctx)
		switch e := err.(type) {
		case nil:
			log.Println("Transaction committed.")
			return nil
		case mongo.CommandError:
			// Can retry commit
			if e.HasErrorLabel("UnknownTransactionCommitResult") {
				log.Println("UnknownTransactionCommitResult, retrying commit operation...")
				continue
			}
			log.Println("Error during commit...")
			return e
		default:
			log.Println("Error during commit...")
			return e
		}
	}
}

Retry Transaction and Commit Operation

Incorporating logic to retrying the transaction for transient errors and retrying the commit, the full code example is:

// Runs the txnFunc and retries if TransientTransactionError encountered

function runTransactionWithRetry(txnFunc, session) {
    while (true) {
        try {
            txnFunc(session);  // performs transaction
            break;
        } catch (error) {
            // If transient error, retry the whole transaction
            if ( error.hasOwnProperty("errorLabels") && error.errorLabels.includes("TransientTransactionError")  ) {
                print("TransientTransactionError, retrying transaction ...");
                continue;
            } else {
                throw error;
            }
        }
    }
}

// Retries commit if UnknownTransactionCommitResult encountered

function commitWithRetry(session) {
    while (true) {
        try {
            session.commitTransaction(); // Uses write concern set at transaction start.
            print("Transaction committed.");
            break;
        } catch (error) {
            // Can retry commit
            if (error.hasOwnProperty("errorLabels") && error.errorLabels.includes("UnknownTransactionCommitResult") ) {
                print("UnknownTransactionCommitResult, retrying commit operation ...");
                continue;
            } else {
                print("Error during commit ...");
                throw error;
            }
       }
    }
}

// Updates two collections in a transactions

function updateEmployeeInfo(session) {
    employeesCollection = session.getDatabase("hr").employees;
    eventsCollection = session.getDatabase("reporting").events;

    session.startTransaction( { readConcern: { level: "snapshot" }, writeConcern: { w: "majority" } } );

    try{
        employeesCollection.updateOne( { employee: 3 }, { $set: { status: "Inactive" } } );
        eventsCollection.insertOne( { employee: 3, status: { new: "Inactive", old: "Active" } } );
    } catch (error) {
        print("Caught exception during transaction, aborting.");
        session.abortTransaction();
        throw error;
    }

    commitWithRetry(session);
}

// Start a session.
session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );

try{
   runTransactionWithRetry(updateEmployeeInfo, session);
} catch (error) {
   // Do something with error
} finally {
   session.endSession();
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.


def run_transaction_with_retry(txn_func, session):
    while True:
        try:
            txn_func(session)  # performs transaction
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # If transient error, retry the whole transaction
            if exc.has_error_label("TransientTransactionError"):
                print("TransientTransactionError, retrying "
                      "transaction ...")
                continue
            else:
                raise

def commit_with_retry(session):
    while True:
        try:
            # Commit uses write concern set at transaction start.
            session.commit_transaction()
            print("Transaction committed.")
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # Can retry commit
            if exc.has_error_label("UnknownTransactionCommitResult"):
                print("UnknownTransactionCommitResult, retrying "
                      "commit operation ...")
                continue
            else:
                print("Error during commit ...")
                raise

# Updates two collections in a transactions

def update_employee_info(session):
    employees_coll = session.client.hr.employees
    events_coll = session.client.reporting.events

    with session.start_transaction(
            read_concern=ReadConcern("snapshot"),
            write_concern=WriteConcern(w="majority"),
            read_preference=ReadPreference.PRIMARY):
        employees_coll.update_one(
            {"employee": 3}, {"$set": {"status": "Inactive"}},
            session=session)
        events_coll.insert_one(
            {"employee": 3, "status": {
                "new": "Inactive", "old": "Active"}},
            session=session)

        commit_with_retry(session)

# Start a session.
with client.start_session() as session:
    try:
        run_transaction_with_retry(update_employee_info, session)
    except Exception as exc:
        # Do something with error.
        raise

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

void runTransactionWithRetry(Runnable transactional) {
    while (true) {
        try {
            transactional.run();
            break;
        } catch (MongoException e) {
            System.out.println("Transaction aborted. Caught exception during transaction.");

            if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
                System.out.println("TransientTransactionError, aborting transaction and retrying ...");
                continue;
            } else {
                throw e;
            }
        }
    }
}

void commitWithRetry(ClientSession clientSession) {
    while (true) {
        try {
            clientSession.commitTransaction();
            System.out.println("Transaction committed");
            break;
        } catch (MongoException e) {
            // can retry commit
            if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
                System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");
                continue;
            } else {
                System.out.println("Exception during commit ...");
                throw e;
            }
        }
    }
}

void updateEmployeeInfo() {

    MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees");
    MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events");

    TransactionOptions txnOptions = TransactionOptions.builder()
            .readPreference(ReadPreference.primary())
            .readConcern(ReadConcern.MAJORITY)
            .writeConcern(WriteConcern.MAJORITY)
            .build();

    try (ClientSession clientSession = client.startSession()) {
        clientSession.startTransaction(txnOptions);

        employeesCollection.updateOne(clientSession,
                Filters.eq("employee", 3),
                Updates.set("status", "Inactive"));
        eventsCollection.insertOne(clientSession,
                new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active")));

        commitWithRetry(clientSession);
    }
}


void updateEmployeeInfoWithRetry() {
    runTransactionWithRetry(this::updateEmployeeInfo);
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

async function commitWithRetry(session) {
  try {
    await session.commitTransaction();
    console.log('Transaction committed.');
  } catch (error) {
    if (
      error.errorLabels &&
      error.errorLabels.indexOf('UnknownTransactionCommitResult') >= 0
    ) {
      console.log('UnknownTransactionCommitResult, retrying commit operation ...');
      await commitWithRetry(session);
    } else {
      console.log('Error during commit ...');
      throw error;
    }
  }
}

async function runTransactionWithRetry(txnFunc, client, session) {
  try {
    await txnFunc(client, session);
  } catch (error) {
    console.log('Transaction aborted. Caught exception during transaction.');

    // If transient error, retry the whole transaction
    if (error.errorLabels && error.errorLabels.indexOf('TransientTransactionError') >= 0) {
      console.log('TransientTransactionError, retrying transaction ...');
      await runTransactionWithRetry(txnFunc, client, session);
    } else {
      throw error;
    }
  }
}

async function updateEmployeeInfo(client, session) {
  session.startTransaction({
    readConcern: { level: 'snapshot' },
    writeConcern: { w: 'majority' },
    readPreference: 'primary'
  });

  const employeesCollection = client.db('hr').collection('employees');
  const eventsCollection = client.db('reporting').collection('events');

  await employeesCollection.updateOne(
    { employee: 3 },
    { $set: { status: 'Inactive' } },
    { session }
  );
  await eventsCollection.insertOne(
    {
      employee: 3,
      status: { new: 'Inactive', old: 'Active' }
    },
    { session }
  );

  try {
    await commitWithRetry(session);
  } catch (error) {
    await session.abortTransaction();
    throw error;
  }
}

return client.withSession(session =>
  runTransactionWithRetry(updateEmployeeInfo, client, session)
);

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session)
{
    while (true) {
        try {
            $txnFunc($client, $session);  // performs transaction
            break;
        } catch (\MongoDB\Driver\Exception\CommandException $error) {
            $resultDoc = $error->getResultDocument();

            // If transient error, retry the whole transaction
            if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) {
                continue;
            } else {
                throw $error;
            }
        } catch (\MongoDB\Driver\Exception\Exception $error) {
            throw $error;
        }
    }
}

private function commitWithRetry3(\MongoDB\Driver\Session $session)
{
    while (true) {
        try {
            $session->commitTransaction();
            echo "Transaction committed.\n";
            break;
        } catch (\MongoDB\Driver\Exception\CommandException $error) {
            $resultDoc = $error->getResultDocument();

            if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) {
                echo "UnknownTransactionCommitResult, retrying commit operation ...\n";
                continue;
            } else {
                echo "Error during commit ...\n";
                throw $error;
            }
        } catch (\MongoDB\Driver\Exception\Exception $error) {
            echo "Error during commit ...\n";
            throw $error;
        }
    }
}

private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session)
{
    $session->startTransaction([
        'readConcern' => new \MongoDB\Driver\ReadConcern("snapshot"),
        'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY)
    ]);

    try {
        $client->hr->employees->updateOne(
            ['employee' => 3],
            ['$set' => ['status' => 'Inactive']],
            ['session' => $session]
        );
        $client->reporting->events->insertOne(
            ['employee' => 3, 'status' => [ 'new' => 'Inactive', 'old' => 'Active']],
            ['session' => $session]
        );
    } catch (\MongoDB\Driver\Exception\Exception $error) {
        echo "Caught exception during transaction, aborting.\n";
        $session->abortTransaction();
        throw $error;
    }

    $this->commitWithRetry3($session);
}

private function doUpdateEmployeeInfo(\MongoDB\Client $client)
{
    // Start a session.
    $session = $client->startSession();

    try {
        $this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session);
    } catch (\MongoDB\Driver\Exception\Exception $error) {
        // Do something with error
    }
}
/* takes a session, an out-param for server reply, and out-param for error. */
typedef bool (*txn_func_t) (mongoc_client_session_t *,
                            bson_t *,
                            bson_error_t *);


/* runs transactions with retry logic */
bool
run_transaction_with_retry (txn_func_t txn_func,
                            mongoc_client_session_t *cs,
                            bson_error_t *error)
{
   bson_t reply;
   bool r;

   while (true) {
      /* perform transaction */
      r = txn_func (cs, &reply, error);
      if (r) {
         /* success */
         bson_destroy (&reply);
         return true;
      }

      MONGOC_WARNING ("Transaction aborted: %s", error->message);
      if (mongoc_error_has_label (&reply, "TransientTransactionError")) {
         /* on transient error, retry the whole transaction */
         MONGOC_WARNING ("TransientTransactionError, retrying transaction...");
         bson_destroy (&reply);
      } else {
         /* non-transient error */
         break;
      }
   }

   bson_destroy (&reply);
   return false;
}


/* commit transactions with retry logic */
bool
commit_with_retry (mongoc_client_session_t *cs, bson_error_t *error)
{
   bson_t reply;
   bool r;

   while (true) {
      /* commit uses write concern set at transaction start, see
       * mongoc_transaction_opts_set_write_concern */
      r = mongoc_client_session_commit_transaction (cs, &reply, error);
      if (r) {
         MONGOC_INFO ("Transaction committed");
         break;
      }

      if (mongoc_error_has_label (&reply, "UnknownTransactionCommitResult")) {
         MONGOC_WARNING ("UnknownTransactionCommitResult, retrying commit ...");
         bson_destroy (&reply);
      } else {
         /* commit failed, cannot retry */
         break;
      }
   }

   bson_destroy (&reply);

   return r;
}


/* updates two collections in a transaction and calls commit_with_retry */
bool
update_employee_info (mongoc_client_session_t *cs,
                      bson_t *reply,
                      bson_error_t *error)
{
   mongoc_client_t *client;
   mongoc_collection_t *employees;
   mongoc_collection_t *events;
   mongoc_read_concern_t *rc;
   mongoc_write_concern_t *wc;
   mongoc_transaction_opt_t *txn_opts;
   bson_t opts = BSON_INITIALIZER;
   bson_t *filter = NULL;
   bson_t *update = NULL;
   bson_t *event = NULL;
   bool r;

   bson_init (reply);

   client = mongoc_client_session_get_client (cs);
   employees = mongoc_client_get_collection (client, "hr", "employees");
   events = mongoc_client_get_collection (client, "reporting", "events");

   rc = mongoc_read_concern_new ();
   mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
   wc = mongoc_write_concern_new ();
   mongoc_write_concern_set_w (wc, MONGOC_WRITE_CONCERN_W_MAJORITY);
   txn_opts = mongoc_transaction_opts_new ();
   mongoc_transaction_opts_set_read_concern (txn_opts, rc);
   mongoc_transaction_opts_set_write_concern (txn_opts, wc);

   r = mongoc_client_session_start_transaction (cs, txn_opts, error);
   if (!r) {
      goto done;
   }

   r = mongoc_client_session_append (cs, &opts, error);
   if (!r) {
      goto done;
   }

   filter = BCON_NEW ("employee", BCON_INT32 (3));
   update = BCON_NEW ("$set", "{", "status", "Inactive", "}");
   /* mongoc_collection_update_one will reinitialize reply */
   bson_destroy (reply);
   r = mongoc_collection_update_one (
      employees, filter, update, &opts, reply, error);

   if (!r) {
      goto abort;
   }

   event = BCON_NEW ("employee", BCON_INT32 (3));
   BCON_APPEND (event, "status", "{", "new", "Inactive", "old", "Active", "}");

   bson_destroy (reply);
   r = mongoc_collection_insert_one (events, event, &opts, reply, error);
   if (!r) {
      goto abort;
   }

   r = commit_with_retry (cs, error);

abort:
   if (!r) {
      MONGOC_ERROR ("Aborting due to error in transaction: %s", error->message);
      mongoc_client_session_abort_transaction (cs, NULL);
   }

done:
   mongoc_collection_destroy (employees);
   mongoc_collection_destroy (events);
   mongoc_read_concern_destroy (rc);
   mongoc_write_concern_destroy (wc);
   mongoc_transaction_opts_destroy (txn_opts);
   bson_destroy (&opts);
   bson_destroy (filter);
   bson_destroy (update);
   bson_destroy (event);

   return r;
}


void
example_func (mongoc_client_t *client)
{
   mongoc_client_session_t *cs;
   bson_error_t error;
   bool r;

   cs = mongoc_client_start_session (client, NULL, &error);
   if (!cs) {
      MONGOC_ERROR ("Could not start session: %s", error.message);
      return;
   }

   r = run_transaction_with_retry (update_employee_info, cs, &error);
   if (!r) {
      MONGOC_ERROR ("Could not update employee, permanent error: %s",
                    error.message);
   }

   mongoc_client_session_destroy (cs);
}
using transaction_func = std::function<void(client_session & session)>;
auto run_transaction_with_retry = [](transaction_func txn_func, client_session& session) {
    while (true) {
        try {
            txn_func(session);  // performs transaction.
            break;
        } catch (const operation_exception& oe) {
            std::cout << "Transaction aborted. Caught exception during transaction."
                      << std::endl;
            // If transient error, retry the whole transaction.
            if (oe.has_error_label("TransientTransactionError")) {
                std::cout << "TransientTransactionError, retrying transaction ..."
                          << std::endl;
                continue;
            } else {
                throw oe;
            }
        }
    }
};

auto commit_with_retry = [](client_session& session) {
    while (true) {
        try {
            session.commit_transaction();  // Uses write concern set at transaction start.
            std::cout << "Transaction committed." << std::endl;
            break;
        } catch (const operation_exception& oe) {
            // Can retry commit
            if (oe.has_error_label("UnknownTransactionCommitResult")) {
                std::cout << "UnknownTransactionCommitResult, retrying commit operation ..."
                          << std::endl;
                continue;
            } else {
                std::cout << "Error during commit ..." << std::endl;
                throw oe;
            }
        }
    }
};

// Updates two collections in a transaction
auto update_employee_info = [&](client_session& session) {
    auto& client = session.client();
    auto employees = client["hr"]["employees"];
    auto events = client["reporting"]["events"];

    options::transaction txn_opts;
    read_concern rc;
    rc.acknowledge_level(read_concern::level::k_snapshot);
    txn_opts.read_concern(rc);
    write_concern wc;
    wc.acknowledge_level(write_concern::level::k_majority);
    txn_opts.write_concern(wc);

    session.start_transaction(txn_opts);

    try {
        employees.update_one(
            make_document(kvp("employee", 3)),
            make_document(kvp("$set", make_document(kvp("status", "Inactive")))));
        events.insert_one(make_document(
            kvp("employee", 3),
            kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active")))));
    } catch (const operation_exception& oe) {
        std::cout << "Caught exception during transaction, aborting." << std::endl;
        session.abort_transaction();
        throw oe;
    }

    commit_with_retry(session);
};

auto session = client.start_session();
try {
    run_transaction_with_retry(update_employee_info, session);
} catch (const operation_exception& oe) {
    // Do something with error.
    throw oe;
}
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session)
{
    while (true)
    {
        try
        {
            txnFunc(client, session); // performs transaction
            break;
        }
        catch (MongoException exception)
        {
            // if transient error, retry the whole transaction
            if (exception.HasErrorLabel("TransientTransactionError"))
            {
                Console.WriteLine("TransientTransactionError, retrying transaction.");
                continue;
            }
            else
            {
                throw;
            }
        }
    }
}

public void CommitWithRetry(IClientSessionHandle session)
{
    while (true)
    {
        try
        {
            session.CommitTransaction();
            Console.WriteLine("Transaction committed.");
            break;
        }
        catch (MongoException exception)
        {
            // can retry commit
            if (exception.HasErrorLabel("UnknownTransactionCommitResult"))
            {
                Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation");
                continue;
            }
            else
            {
                Console.WriteLine($"Error during commit: {exception.Message}.");
                throw;
            }
        }
    }
}

// updates two collections in a transaction
public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session)
{
    var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees");
    var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events");

    session.StartTransaction(new TransactionOptions(
        readConcern: ReadConcern.Snapshot,
        writeConcern: WriteConcern.WMajority));

    try
    {
        employeesCollection.UpdateOne(
            session,
            Builders<BsonDocument>.Filter.Eq("employee", 3),
            Builders<BsonDocument>.Update.Set("status", "Inactive"));
        eventsCollection.InsertOne(
            session,
            new BsonDocument
            {
                { "employee", 3 },
                { "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } }
            });
    }
    catch (Exception exception)
    {
        Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}.");
        session.AbortTransaction();
        throw;
    }

    CommitWithRetry(session);
}

public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client)
{
    // start a session
    using (var session = client.StartSession())
    {
        try
        {
            RunTransactionWithRetry(UpdateEmployeeInfo, client, session);
        }
        catch (Exception exception)
        {
            // do something with error
            Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}.");
        }
    }
}

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

sub runTransactionWithRetry {
    my ( $txnFunc, $session ) = @_;

    LOOP: {
        eval {
            $txnFunc->($session); # performs transaction
        };
        if ( my $error = $@ ) {
            print("Transaction aborted-> Caught exception during transaction.\n");
            # If transient error, retry the whole transaction
            if ( $error->has_error_label("TransientTransactionError") ) {
                print("TransientTransactionError, retrying transaction ->..\n");
                redo LOOP;
            }
            else {
                die $error;
            }
        }
    }

    return;
}

sub commitWithRetry {
    my ($session) = @_;

    LOOP: {
        eval {
            $session->commit_transaction(); # Uses write concern set at transaction start.
            print("Transaction committed->\n");
        };
        if ( my $error = $@ ) {
            # Can retry commit
            if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
                print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
                redo LOOP;
            }
            else {
                print("Error during commit ->..\n");
                die $error;
            }
        }
    }

    return;
}

# Updates two collections in a transactions

sub updateEmployeeInfo {
    my ($session) = @_;
    my $employeesCollection = $session->client->ns("hr.employees");
    my $eventsCollection    = $session->client->ns("reporting.events");

    $session->start_transaction(
        {
            readConcern  => { level => "snapshot" },
            writeConcern => { w     => "majority" },
            readPreference => 'primary',
        }
    );

    eval {
        $employeesCollection->update_one(
            { employee => 3 }, { '$set' => { status => "Inactive" } },
            { session => $session},
        );
        $eventsCollection->insert_one(
            { employee => 3, status => { new => "Inactive", old => "Active" } },
            { session => $session},
        );
    };
    if ( my $error = $@ ) {
        print("Caught exception during transaction, aborting->\n");
        $session->abort_transaction();
        die $error;
    }

    commitWithRetry($session);
}

# Start a session
my $session = $client->start_session();

eval {
    runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
    # Do something with error
}

$session->end_session();

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

def run_transaction_with_retry(session)
  begin
    yield session # performs transaction
  rescue Mongo::Error => e
    puts 'Transaction aborted. Caught exception during transaction.'
    raise unless e.label?(Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL)
    puts "#{Mongo::Error::TRANSIENT_TRANSACTION_ERROR_LABEL}, retrying transaction ..."
    retry
  end
end

def commit_with_retry(session)
  begin
    session.commit_transaction
    puts 'Transaction committed.'
  rescue Mongo::Error => e
    if e.label?(Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)
      puts "#{Mongo::Error::UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL}, retrying commit operation ..."
      retry
    else
      puts 'Error during commit ...'
      raise
    end
  end
end

# updates two collections in a transaction

def update_employee_info(session)
  employees_coll = session.client.use(:hr)[:employees]
  events_coll = session.client.use(:reporting)[:events]

  session.start_transaction(read_concern: { level: :snapshot },
                            write_concern: { w: :majority })
  employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} },
                            session: session)
  events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } },
                         session: session)
  commit_with_retry(session)
end

session = client.start_session

begin
  run_transaction_with_retry(session) { |s| update_employee_info(s) }
rescue StandardError => e
  # Do something with error
  raise
end

Important

To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.

  def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
    observable.map(clientSession => {
      val employeesCollection = database.getCollection("employees")
      val eventsCollection = database.getCollection("events")

      val transactionOptions = TransactionOptions.builder().readConcern(ReadConcern.SNAPSHOT).writeConcern(WriteConcern.MAJORITY).build()
      clientSession.startTransaction(transactionOptions)
      employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
        .subscribe((res: UpdateResult) => println(res))
      eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
        .subscribe((res: Completed) => println(res))

      clientSession
    })
  }

  def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
        println("UnknownTransactionCommitResult, retrying commit operation ...")
        commitAndRetry(observable)
      }
      case e: Exception => {
        println(s"Exception during commit ...: $e")
        throw e
      }
    })
  }

  def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
        println("TransientTransactionError, aborting transaction and retrying ...")
        runTransactionAndRetry(observable)
      }
    })
  }

  def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {

    val database = client.getDatabase("hr")
    val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
    val commitTransactionObservable: SingleObservable[Completed] =
      updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
    val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)

    runTransactionAndRetry(commitAndRetryObservable)
  }
}
	runTransactionWithRetry := func(sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error {
		for {
			err := txnFn(sctx) // Performs transaction.
			if err == nil {
				return nil
			}

			log.Println("Transaction aborted. Caught exception during transaction.")

			// If transient error, retry the whole transaction
			if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
				log.Println("TransientTransactionError, retrying transaction...")
				continue
			}
			return err
		}
	}

	commitWithRetry := func(sctx mongo.SessionContext) error {
		for {
			err := sctx.CommitTransaction(sctx)
			switch e := err.(type) {
			case nil:
				log.Println("Transaction committed.")
				return nil
			case mongo.CommandError:
				// Can retry commit
				if e.HasErrorLabel("UnknownTransactionCommitResult") {
					log.Println("UnknownTransactionCommitResult, retrying commit operation...")
					continue
				}
				log.Println("Error during commit...")
				return e
			default:
				log.Println("Error during commit...")
				return e
			}
		}
	}

	// Updates two collections in a transaction.
	updateEmployeeInfo := func(sctx mongo.SessionContext) error {
		employees := client.Database("hr").Collection("employees")
		events := client.Database("reporting").Collection("events")

		err := sctx.StartTransaction(options.Transaction().
			SetReadConcern(readconcern.Snapshot()).
			SetWriteConcern(writeconcern.New(writeconcern.WMajority())),
		)
		if err != nil {
			return err
		}

		_, err = employees.UpdateOne(sctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}})
		if err != nil {
			sctx.AbortTransaction(sctx)
			log.Println("caught exception during transaction, aborting.")
			return err
		}
		_, err = events.InsertOne(sctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}})
		if err != nil {
			sctx.AbortTransaction(sctx)
			log.Println("caught exception during transaction, aborting.")
			return err
		}

		return commitWithRetry(sctx)
	}

	return client.UseSessionWithOptions(
		ctx, options.Session().SetDefaultReadPreference(readpref.Primary()),
		func(sctx mongo.SessionContext) error {
			return runTransactionWithRetry(sctx, updateEmployeeInfo)
		},
	)
}