Search code examples
spring-webfluxkotlin-coroutinesspring-data-r2dbcr2dbcr2dbc-postgresql

How to test SELECT FOR UPDATE with R2DBC postgresql and Webflux


I have the following query in CoroutineCrudRepository, the database is postgresql of 14.8 version running in docker

@Query("select * from test_table where status = 'NEW' order by creation_date limit :limit for update skip locked")
suspend fun selectForProcessing(@Param("limit") limit: Int): Flow<Task>

I would like to write an integration test that checks that 2 different parallel transactions will select different tasks by this query

I wrote the following test

@Autowired
lateinit var rtm: ReactiveTransactionManager

fun `should select tasks 2 parallel transactions`() = runBlocking {
        val def = DefaultTransactionDefinition()
        def.setName("trx1")
        def.setIsolationLevel(ISOLATION_REPEATABLE_READ)
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW)
        val def2 = DefaultTransactionDefinition()
        def2.setName("trx2")
        def2.setIsolationLevel(ISOLATION_REPEATABLE_READ)
        def2.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW)

        val trx1 = DelayedTransactionalOperator(rtm, def, 300)
        val trx2 = DelayedTransactionalOperator(rtm, def2, 0)

        val tasks = async {  trx1.executeAndAwait { taskRepo.selectForProcessing(5) } }
        val tasks2 = async {  trx2.executeAndAwait { taskRepo.selectForProcessing(5) } }

        assertNotEquals(tasks.await().toList(), tasks2.await().toList())
    }

In order to delay committing of the transactions I created

internal class DelayedTransactionalOperator(
            private val transactionManager: ReactiveTransactionManager,
            private val transactionDefinition: TransactionDefinition,
            private val commitTrxDelay: Long = 0
    ) : TransactionalOperator {
        override fun <T : Any?> execute(action: TransactionCallback<T>): Flux<T> {
            println("Started trx ${transactionDefinition.name}")
            return TransactionContextManager.currentContext().flatMapMany<T> { context: TransactionContext? ->
                Flux.usingWhen<T, ReactiveTransaction>(
                        this.transactionManager.getReactiveTransaction(this.transactionDefinition),
                        { status: ReactiveTransaction -> action.doInTransaction(status) },
                        { transaction: ReactiveTransaction -> this.delayedCommit(transaction) },
                        { status: ReactiveTransaction, ex: Throwable? -> this.rollbackOnException(status, ex) },
                        { transaction: ReactiveTransaction -> this.transactionManager.rollback(transaction) })
                        .onErrorMap { ex: Throwable -> this.unwrapIfResourceCleanupFailure(ex) }
            }
                    .contextWrite(TransactionContextManager.getOrCreateContext())
                    .contextWrite(TransactionContextManager.getOrCreateContextHolder())
        }

        private fun delayedCommit(transaction: ReactiveTransaction): Mono<Void> {
            return Mono.delay(Duration.ofMillis(commitTrxDelay)).then(this.transactionManager.commit(transaction)).doOnEach {
                println("Committed trx ${transactionDefinition.name}")
            }
        }

        private fun rollbackOnException(status: ReactiveTransaction, ex: Throwable?): Mono<Void> {
            return this.transactionManager.rollback(status).onErrorMap { ex2 -> ex2 }
        }

        private fun unwrapIfResourceCleanupFailure(ex: Throwable): Throwable? {
            if (ex is RuntimeException && ex.cause != null) {
                val msg = ex.message
                if (msg != null && msg.startsWith("Async resource cleanup failed")) {
                    return ex.cause
                }
            }
            return ex
        }
    }

In the logs I see that first transaction is committed later than the 2nd one but the queries returns the same entities. Currently I am stuck and don't understand how to write a test properly.


Solution

  • Solved by replacing

    val tasks = async {  trx1.executeAndAwait { taskRepo.selectForProcessing(5) } }
    val tasks2 = async {  trx2.executeAndAwait { taskRepo.selectForProcessing(5) } }
    

    with

    val tasks = async { taskRepo.selectForProcessing(5).transactional(trx1).toList() }
    val tasks2 = async { taskRepo.selectForProcessing(5).transactional(trx2).toList() }