0

There is an example of Scala mongodb transaction:

https://github.com/mongodb/mongo-scala-driver/blob/r2.4.0/driver/src/it/scala/org/mongodb/scala/DocumentationTransactionsExampleSpec.scala

But it's not clear how to rollback the transaction in case of failure.

Here is the code I copied from official example but modified a bit to make the transaction fail in the second insertion (inserting 2 documents with same ids), but problem is that the first document is persisted, and I need the WHOLE transaction to be rolled back.

import org.mongodb.scala._
import scala.concurrent.Await
import scala.concurrent.duration.Duration

object Application extends App {

  val mongoClient: MongoClient = MongoClient("mongodb://localhost:27018")

  val database = mongoClient.getDatabase("hr")
  val employeesCollection = database.getCollection("employees")

  // Implicit functions that execute the Observable and return the results
  val waitDuration = Duration(5, "seconds")
  implicit class ObservableExecutor[T](observable: Observable[T]) {
    def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
  }

  implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
    def execute(): T = Await.result(observable.toFuture(), waitDuration)
  }


  updateEmployeeInfoWithRetry(mongoClient).execute()

  Thread.sleep(3000)

  /// -------------------------


  def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
    observable.map(clientSession => {
      val eventsCollection = database.getCollection("events")

      val transactionOptions = TransactionOptions.builder().readConcern(ReadConcern.SNAPSHOT).writeConcern(WriteConcern.MAJORITY).build()
      clientSession.startTransaction(transactionOptions)

      eventsCollection.insertOne(clientSession, Document("_id" -> "123", "employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
        .subscribe((res: Completed) => println(res))

      // THIS SHOULD FAIL, SINCE  THERE IS ALREADY DOCUMENT WITH ID = 123, but PREVIOUS OPERATION SHOULD BE ALSO ROLLED BACK.
      // I COULD NOT FIND THE WAY HOW TO ROLLBACK WHOLE TRANSACTION IF ONE OF OPERATIONS FAILED
      eventsCollection.insertOne(clientSession, Document("_id" -> "123", "employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
        .subscribe((res: Completed) => println(res))
      // I'VE TRIED VARIOUS THINGS (INCLUDING CODE BELOW)
//        .subscribe(new Observer[Completed] {
//          override def onNext(result: Completed): Unit = println("onNext")
//
//          override def onError(e: Throwable): Unit = clientSession.abortTransaction()
//
//          override def onComplete(): Unit = println("complete")
//        })
      clientSession
    })
  }

  def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
        println("UnknownTransactionCommitResult, retrying commit operation ...")
        commitAndRetry(observable)
      }
      case e: Exception => {
        println(s"Exception during commit ...: $e")
        throw e
      }
    })
  }

  def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
    observable.recoverWith({
      case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
        println("TransientTransactionError, aborting transaction and retrying ...")
        runTransactionAndRetry(observable)
      }
    })
  }

  def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {

    val database = client.getDatabase("hr")
    val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
    val commitTransactionObservable: SingleObservable[Completed] =
      updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
    val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)

    runTransactionAndRetry(commitAndRetryObservable)
  }
}

How to rollback the whole transaction if any operation failed?

4
  • Are you inserting both documents as part of the same transaction? Can you elaborate on your scenario? The _id index is always unique, so if there are two independent inserts with the same _id only the first one will succeed. It sounds unusual to be creating documents with duplicate keys in the same transaction, however if any operation in a transaction fails the transaction will abort and discard all data changes made in the transaction. Transaction commit/rollback support is a server feature in MongoDB 4.0+; your application does not explicitly rollback individual changes. Commented Apr 30, 2019 at 8:04
  • @Stennie I insert two documents with same ids just to check whether the transaction will be rolled back. "your application does not explicitly rollback individual changes." - so that's the question, I did not find the way to explicitly rollback transaction in case of failure. Commented May 5, 2019 at 19:58
  • Can you edit your question to include the actual code you are testing? Assuming you have correctly set up a transaction, if the transaction aborts (for example, on a duplicate key exception) the changes in that transaction will not be committed. If both inserts are part of the same transaction, I would expect the transaction to fail without committing any changes. Your current description sounds like expected behaviour for two documents that are independently inserted with the same _id value: the first will succeed and subsequent attempts will fail with a duplicate key exception. Commented May 6, 2019 at 20:12
  • @Stennie I edited my question with an example, I would be very thankful if you can shed some light on that. Commented May 11, 2019 at 15:09

1 Answer 1

1

From the source code of the Scala driver at https://github.com/mongodb/mongo-scala-driver/blob/r2.6.0/driver/src/main/scala/org/mongodb/scala/ClientSessionImplicits.scala

It appears that there is an abortTransaction() method defined along with commitTransaction().

In another note, currently a single replica set transaction in MongoDB 4.0 will be automatically aborted if it's not committed within 60 seconds (configurable). In the MongoDB Multi-Document ACID Transactions blog post:

By default, MongoDB will automatically abort any multi-document transaction that runs for more than 60 seconds. Note that if write volumes to the server are low, you have the flexibility to tune your transactions for a longer execution time.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.