I have an issue when trying to concurrently select rows in postgesql database using R2DBC driver.
I have method that should create entity with order number based on total quantity of these entites in specified zone.
val operator = TransactionalOperator.create(
transactionManager,
DefaultTransactionDefinition().apply {
isolationLevel = TransactionDefinition.ISOLATION_REPEATABLE_READ
propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRES_NEW
}
)
operator.executeAndAwait {
val existingEntities = repository.findByZoneWithLock(id)
val entity = TestEntity()
entity.order = existingEntities.size() + 1
repository.save(entity)
}
Repository from the service with locking query:
interface EntityRepository<TestEntity> : CoroutineCrudRepository<TestEntity, String> {
@Query("Select * from TestEntity where zone = :zone for update")
fun findByIdWithLock(zone: String): Flow<TestEntity>
}
When trying to execute 2 concurrent requests that invoke my method I am expecting next behaviour: 0) We have 1 precreated entity with order 1 in DB
- One of Transactions gets 1 entity and locks existing entities
- Second Transaction will be waiting for the releasing the lock of first transaction
- First transaction create new entity, sets the order to 2 (based on size of existing entities), save it and finally commit
- Second Transaction gets updated amount of entities from the DB with size = 2
- Second Transaction creates the entity with the order 3 and commit
This is common behavior like if we execute this actions in DB directly with SQL.
However, in the result I can see that I have 3 rows, one with order = 1 (precreated) and other both with order = 2. For some reason on the 2 second step Second Transaction didn't wait for the lock and got 1 precreated row as well. I've tried to change isolation levels, but it didn't help (Serializable just throws exeption in Second Transaction). Also I've tried to change TransactionOperator to @Transactional and replace @Query with "select for update" with @Lock(LockMode.PESSIMISTIC_WRITE), same result
What can be the reason for such behavior? I am new to R2DBC and couroutine repository, but in common non-reactive JDBC it works as expected. Maybe I don't condider some driver configuration or cache.
Postgres version: 16 Spring-boot-starter-data-r2dbc version: 3.3.0