Let's consider this function
@Transactional
fun conditionalInsertEntity(dbEntity: DBEntity): Mono<DBEntity> {
return fetchObjectByPublicId(dbEntity.publicId)
.switchIfEmpty {
r2DatabaseClient.insert()
.into(DBEntity::class.java)
.using(Flux.just(dbEntity))
.fetch()
.one()
.map { it["entity_id"] as Long }
.flatMap { fetchObjectById(it) }
}
}
while running above function with following driver code I get duplicate entry errors if the list contains duplicates. Ideally it shouldn't give that error because the above function is already handling the case for duplicate inserts!!
val result = Flux.fromIterable(listOf(dbEntity1, dbEntity1, dbEntity2))
.flatMap { conditionalInsertEntity(it) }
.collectList()
.block()
Realized that this is an issue of using flatMap instead of concatMap. ConcatMap collects the result from individual publishers sequentially unlike flatMap. (more here)
Because I used flatMap, multiple publishers thought that the entity isn't already available in the DB