I have the following query in CoroutineCrudRepository, the database is postgresql of 14.8 version running in docker
@Query("select * from test_table where status = 'NEW' order by creation_date limit :limit for update skip locked")
suspend fun selectForProcessing(@Param("limit") limit: Int): Flow<Task>
I would like to write an integration test that checks that 2 different parallel transactions will select different tasks by this query
I wrote the following test
@Autowired
lateinit var rtm: ReactiveTransactionManager
fun `should select tasks 2 parallel transactions`() = runBlocking {
val def = DefaultTransactionDefinition()
def.setName("trx1")
def.setIsolationLevel(ISOLATION_REPEATABLE_READ)
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW)
val def2 = DefaultTransactionDefinition()
def2.setName("trx2")
def2.setIsolationLevel(ISOLATION_REPEATABLE_READ)
def2.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW)
val trx1 = DelayedTransactionalOperator(rtm, def, 300)
val trx2 = DelayedTransactionalOperator(rtm, def2, 0)
val tasks = async { trx1.executeAndAwait { taskRepo.selectForProcessing(5) } }
val tasks2 = async { trx2.executeAndAwait { taskRepo.selectForProcessing(5) } }
assertNotEquals(tasks.await().toList(), tasks2.await().toList())
}
In order to delay committing of the transactions I created
internal class DelayedTransactionalOperator(
private val transactionManager: ReactiveTransactionManager,
private val transactionDefinition: TransactionDefinition,
private val commitTrxDelay: Long = 0
) : TransactionalOperator {
override fun <T : Any?> execute(action: TransactionCallback<T>): Flux<T> {
println("Started trx ${transactionDefinition.name}")
return TransactionContextManager.currentContext().flatMapMany<T> { context: TransactionContext? ->
Flux.usingWhen<T, ReactiveTransaction>(
this.transactionManager.getReactiveTransaction(this.transactionDefinition),
{ status: ReactiveTransaction -> action.doInTransaction(status) },
{ transaction: ReactiveTransaction -> this.delayedCommit(transaction) },
{ status: ReactiveTransaction, ex: Throwable? -> this.rollbackOnException(status, ex) },
{ transaction: ReactiveTransaction -> this.transactionManager.rollback(transaction) })
.onErrorMap { ex: Throwable -> this.unwrapIfResourceCleanupFailure(ex) }
}
.contextWrite(TransactionContextManager.getOrCreateContext())
.contextWrite(TransactionContextManager.getOrCreateContextHolder())
}
private fun delayedCommit(transaction: ReactiveTransaction): Mono<Void> {
return Mono.delay(Duration.ofMillis(commitTrxDelay)).then(this.transactionManager.commit(transaction)).doOnEach {
println("Committed trx ${transactionDefinition.name}")
}
}
private fun rollbackOnException(status: ReactiveTransaction, ex: Throwable?): Mono<Void> {
return this.transactionManager.rollback(status).onErrorMap { ex2 -> ex2 }
}
private fun unwrapIfResourceCleanupFailure(ex: Throwable): Throwable? {
if (ex is RuntimeException && ex.cause != null) {
val msg = ex.message
if (msg != null && msg.startsWith("Async resource cleanup failed")) {
return ex.cause
}
}
return ex
}
}
In the logs I see that first transaction is committed later than the 2nd one but the queries returns the same entities. Currently I am stuck and don't understand how to write a test properly.
Solved by replacing
val tasks = async { trx1.executeAndAwait { taskRepo.selectForProcessing(5) } }
val tasks2 = async { trx2.executeAndAwait { taskRepo.selectForProcessing(5) } }
with
val tasks = async { taskRepo.selectForProcessing(5).transactional(trx1).toList() }
val tasks2 = async { taskRepo.selectForProcessing(5).transactional(trx2).toList() }