1

When adding @Transactional annotation to a service suspend function which is being called by the handler I get the following error. If I leave it without the annotation then the code works as expected but in case of error it cannot roll back.

either is coming from arrow-kt-core

asHandlerFunction is used as a brigde to be able to document the APIs.

Any idea what happens?

Entities and repositories are placed under io.x.a. The service is inside io.x. The repository scans only io.x.a

Error:

java.lang.IllegalArgumentException: object is not an instance of declaring class
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
    |_ checkpoint ⇢ HTTP POST "/api/assets/130473/one-second-resolution" [ExceptionHandlingWebHandler]
Stack trace:
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at kotlin.reflect.jvm.internal.calls.InlineClassAwareCaller.call(InlineClassAwareCaller.kt:134)
        at kotlin.reflect.jvm.internal.KCallableImpl.call(KCallableImpl.kt:108)
        at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)
        at org.springframework.core.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:377)
        at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
        at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:25)
        at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:110)
        at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126)
        at kotlinx.coroutines.reactor.MonoKt.monoInternal$lambda-2(Mono.kt:90)
        at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoUsingWhen.subscribe(MonoUsingWhen.java:87)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:82)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
        at com.github.jasync.sql.db.util.FutureUtilsKt.success(FutureUtils.kt:16)
        at com.github.jasync.sql.db.mysql.MySQLConnection$succeedQueryPromise$1.accept(MySQLConnection.kt:344)
        at com.github.jasync.sql.db.mysql.MySQLConnection$succeedQueryPromise$1.accept(MySQLConnection.kt:54)
        at java.base/java.util.Optional.ifPresent(Optional.java:183)
        at com.github.jasync.sql.db.mysql.MySQLConnection.succeedQueryPromise(MySQLConnection.kt:343)
        at com.github.jasync.sql.db.mysql.MySQLConnection.onOk(MySQLConnection.kt:218)
        at com.github.jasync.sql.db.mysql.codec.MySQLConnectionHandler.channelRead0(MySQLConnectionHandler.kt:119)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.base/java.lang.Thread.run(Thread.java:829)


Router:

    @Bean
    fun router(...): RouterFunction<ServerResponse> {
        return route()
            .POST(
                "...",
                asHandlerFunction { createNewOneSecondResolutionSession(it) })
            .build()
    }

    private fun asHandlerFunction(init: suspend (ServerRequest) -> ServerResponse) = HandlerFunction {
        mono(Dispatchers.Unconfined) {
            init(it)
        }
    }

Handler:

    private suspend fun theFun(req: ServerRequest): ServerResponse {
        val a = ...

        val b = ...

        return service.theFun(a, b).fold(
            { error ->
                internalServerErrorResponse("Client user already exists.")
            },
            { ServerResponse.ok().bodyValueAndAwait(it) }
        )
    }

Service:

    @Transactional("tm1")
    suspend fun theFun(
        a: A,
        b: B
    ): Either<Error, Result> = either {
        val user= userService.createNewUser(username = "test", password = "pw")
            .mapLeft {
                log
                Error
            }
            .bind()
            throw RuntimeException("xx")
    }

Persistence Configuration:

@Configuration
@EnableR2dbcRepositories(
    basePackages = ["io.x.a"],
    entityOperationsRef = "operations1"
)
class PersistenceConfig1(
    @Value("\${spring.datasource.d1.r2dbcUrl}") private val r2dbcUrl: String
) {

    @Bean
    @Qualifier("d1")
    fun connectionFactory(): ConnectionFactory {
        return ConnectionFactories.get(ConnectionFactoryOptions.parse(r2dbcUrl))
    }

    @Bean
    fun r2dbcEntityOperations(@Qualifier("d1") connectionFactory: ConnectionFactory): R2dbcEntityOperations {
        val databaseClient = DatabaseClient.create(connectionFactory)

        return R2dbcEntityTemplate(databaseClient, DefaultReactiveDataAccessStrategy(MySqlDialect.INSTANCE));
    }

    @Bean("tm1")
    fun transactionManager(@Qualifier("d1") connectionFactory: ConnectionFactory): ReactiveTransactionManager {
        return R2dbcTransactionManager(connectionFactory)
    }
}

1 Answer 1

2

I found a solution using TransactionalOperator.

    @Bean("transactionalOperator1")
    fun transactionalOperator1(@Qualifier("tm1") transactionManager: ReactiveTransactionManager): TransactionalOperator {
        return TransactionalOperator.create(transactionManager)
    }

    @Bean("transactionalOperator2")
    fun transactionalOperator2(@Qualifier("tm2") transactionManager: ReactiveTransactionManager): TransactionalOperator {
        return TransactionalOperator.create(transactionManager)
    }
    @Qualifier("transactionalOperator1") private val transactionalOperator1: TransactionalOperator,
    @Qualifier("transactionalOperator2") private val transactionalOperator2: TransactionalOperator,

    suspend fun theFun(...): Either<Error, Result> = 
        transactionalOperator1.executeAndAwait { t1->
            transactionalOperator2.executeAndAwait { t2->
                ...
                    .handleErrorWith {
                        t1.setRollbackOnly()
                        t2.setRollbackOnly()
                    }
            }
        }!!

At the end of executeAndAwait it commits the code. If the rollbackOnly flag is set then it will instruct the ReactiveTransactionManager to roll back the changes.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.