Search code examples
kotlinspring-webfluxspring-data-r2dbcr2dbcr2dbc-postgresql

Multiple conditional inserts of a new entity gives duplicate entry error in R2DBC


Let's consider this function

@Transactional
fun conditionalInsertEntity(dbEntity: DBEntity): Mono<DBEntity> {
    return fetchObjectByPublicId(dbEntity.publicId)
        .switchIfEmpty {
            r2DatabaseClient.insert()
                .into(DBEntity::class.java)
                .using(Flux.just(dbEntity))
                .fetch()
                .one()
                .map { it["entity_id"] as Long }
                .flatMap { fetchObjectById(it) }
        }
}

while running above function with following driver code I get duplicate entry errors if the list contains duplicates. Ideally it shouldn't give that error because the above function is already handling the case for duplicate inserts!!

val result = Flux.fromIterable(listOf(dbEntity1, dbEntity1, dbEntity2))
    .flatMap { conditionalInsertEntity(it) }
    .collectList()
    .block()

Solution

  • Realized that this is an issue of using flatMap instead of concatMap. ConcatMap collects the result from individual publishers sequentially unlike flatMap. (more here)

    Because I used flatMap, multiple publishers thought that the entity isn't already available in the DB