There is an example of Scala mongodb transaction:
But it's not clear how to rollback the transaction in case of failure.
Here is the code I copied from official example but modified a bit to make the transaction fail in the second insertion (inserting 2 documents with same ids), but problem is that the first document is persisted, and I need the WHOLE transaction to be rolled back.
import org.mongodb.scala._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object Application extends App {
val mongoClient: MongoClient = MongoClient("mongodb://localhost:27018")
val database = mongoClient.getDatabase("hr")
val employeesCollection = database.getCollection("employees")
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
updateEmployeeInfoWithRetry(mongoClient).execute()
Thread.sleep(3000)
/// -------------------------
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder().readConcern(ReadConcern.SNAPSHOT).writeConcern(WriteConcern.MAJORITY).build()
clientSession.startTransaction(transactionOptions)
eventsCollection.insertOne(clientSession, Document("_id" -> "123", "employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
// THIS SHOULD FAIL, SINCE THERE IS ALREADY DOCUMENT WITH ID = 123, but PREVIOUS OPERATION SHOULD BE ALSO ROLLED BACK.
// I COULD NOT FIND THE WAY HOW TO ROLLBACK WHOLE TRANSACTION IF ONE OF OPERATIONS FAILED
eventsCollection.insertOne(clientSession, Document("_id" -> "123", "employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
// I'VE TRIED VARIOUS THINGS (INCLUDING CODE BELOW)
// .subscribe(new Observer[Completed] {
// override def onNext(result: Completed): Unit = println("onNext")
//
// override def onError(e: Throwable): Unit = clientSession.abortTransaction()
//
// override def onComplete(): Unit = println("complete")
// })
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}
How to rollback the whole transaction if any operation failed?
_idindex is always unique, so if there are two independent inserts with the same_idonly the first one will succeed. It sounds unusual to be creating documents with duplicate keys in the same transaction, however if any operation in a transaction fails the transaction will abort and discard all data changes made in the transaction. Transaction commit/rollback support is a server feature in MongoDB 4.0+; your application does not explicitly rollback individual changes._idvalue: the first will succeed and subsequent attempts will fail with a duplicate key exception.