When adding @Transactional annotation to a service suspend function which is being called by the handler I get the following error. If I leave it without the annotation then the code works as expected but in case of error it cannot roll back.
either is coming from arrow-kt-core
asHandlerFunction is used as a brigde to be able to document the APIs.
Any idea what happens?
Entities and repositories are placed under io.x.a. The service is inside io.x. The repository scans only io.x.a
Error:
java.lang.IllegalArgumentException: object is not an instance of declaring class
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP POST "/api/assets/130473/one-second-resolution" [ExceptionHandlingWebHandler]
Stack trace:
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at kotlin.reflect.jvm.internal.calls.InlineClassAwareCaller.call(InlineClassAwareCaller.kt:134)
at kotlin.reflect.jvm.internal.KCallableImpl.call(KCallableImpl.kt:108)
at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)
at org.springframework.core.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:377)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:25)
at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:110)
at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126)
at kotlinx.coroutines.reactor.MonoKt.monoInternal$lambda-2(Mono.kt:90)
at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoUsingWhen.subscribe(MonoUsingWhen.java:87)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:82)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at com.github.jasync.sql.db.util.FutureUtilsKt.success(FutureUtils.kt:16)
at com.github.jasync.sql.db.mysql.MySQLConnection$succeedQueryPromise$1.accept(MySQLConnection.kt:344)
at com.github.jasync.sql.db.mysql.MySQLConnection$succeedQueryPromise$1.accept(MySQLConnection.kt:54)
at java.base/java.util.Optional.ifPresent(Optional.java:183)
at com.github.jasync.sql.db.mysql.MySQLConnection.succeedQueryPromise(MySQLConnection.kt:343)
at com.github.jasync.sql.db.mysql.MySQLConnection.onOk(MySQLConnection.kt:218)
at com.github.jasync.sql.db.mysql.codec.MySQLConnectionHandler.channelRead0(MySQLConnectionHandler.kt:119)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:829)
Router:
@Bean
fun router(...): RouterFunction<ServerResponse> {
return route()
.POST(
"...",
asHandlerFunction { createNewOneSecondResolutionSession(it) })
.build()
}
private fun asHandlerFunction(init: suspend (ServerRequest) -> ServerResponse) = HandlerFunction {
mono(Dispatchers.Unconfined) {
init(it)
}
}
Handler:
private suspend fun theFun(req: ServerRequest): ServerResponse {
val a = ...
val b = ...
return service.theFun(a, b).fold(
{ error ->
internalServerErrorResponse("Client user already exists.")
},
{ ServerResponse.ok().bodyValueAndAwait(it) }
)
}
Service:
@Transactional("tm1")
suspend fun theFun(
a: A,
b: B
): Either<Error, Result> = either {
val user= userService.createNewUser(username = "test", password = "pw")
.mapLeft {
log
Error
}
.bind()
throw RuntimeException("xx")
}
Persistence Configuration:
@Configuration
@EnableR2dbcRepositories(
basePackages = ["io.x.a"],
entityOperationsRef = "operations1"
)
class PersistenceConfig1(
@Value("\${spring.datasource.d1.r2dbcUrl}") private val r2dbcUrl: String
) {
@Bean
@Qualifier("d1")
fun connectionFactory(): ConnectionFactory {
return ConnectionFactories.get(ConnectionFactoryOptions.parse(r2dbcUrl))
}
@Bean
fun r2dbcEntityOperations(@Qualifier("d1") connectionFactory: ConnectionFactory): R2dbcEntityOperations {
val databaseClient = DatabaseClient.create(connectionFactory)
return R2dbcEntityTemplate(databaseClient, DefaultReactiveDataAccessStrategy(MySqlDialect.INSTANCE));
}
@Bean("tm1")
fun transactionManager(@Qualifier("d1") connectionFactory: ConnectionFactory): ReactiveTransactionManager {
return R2dbcTransactionManager(connectionFactory)
}
}