Search code examples
springkotlinspring-webfluxproject-reactorchaining

Chaining multiple publishers transforming the responses


I want to chain one mono after each flux event. The mono publisher will need information from each event published by the flux. The response should be a flux with data of the flux event and the mono response.

After digging, I end up with a map inside a flatMap. The code looks like this:

override fun searchPets(petSearch: PetSearch): Flux<Pet> {
    return petRepository
        .searchPets(petSearch) // returns Flux<pet>
        .flatMap { pet -> 
            petService
            .getCollarForMyPet() // returns Mono<collar>
            .map { collar -> PetConverter.addCollarToPet(pet, collar) } //returns pet (now with with collar)
        }
}

My main concerns are:

  • Is a code smell using a map inside a flatMap?
  • Will pet variable content suffer race conditions with multiple flux events coming, and also the mono events?
  • Is there any better way to approach this kind of behaviour?

Solution

  • This approach is perfectly fine.

    The Reactive Streams specification mandates that onNext events don't overlap, so there won't be an issue with race conditions.

    flatMap introduces concurrency though, so multiple calls to the PetService will run in parallel. This shouldn't be an issue, unless searchPets emits some instance of Pet twice.

    Not that due to that concurrency, flatMap can kind of reorder pets in this scenario. Imagine the search returns petA then petB, but the petService call for petA takes longer. In the ouptut of the flatMap, petB would be emitted first (with its collar set), then petA.