Search code examples
spring-bootkotlinspring-webfluxproject-reactorspring-webclient

How to make multiple Spring Webclient calls in parallel and wait for the result?


I am new to Reactive programming and I would like to make two API calls in parallel and process the results and return a simple array or list of items.

I have two functions, one returns a Flux and the other returns a Mono and I make a very simple filtering logic on the Flux emitted items depending on the result of that Mono.

I tried to use zipWith but only one item made it to the end no matter what filtering logic. Also I tried with block but that is not allowed inside the controller :/

@GetMapping("/{id}/offers")
fun viewTaskOffers(
        @PathVariable("id") id: String,
        @AuthenticationPrincipal user: UserPrincipal
) : Flux<ViewOfferDTO> {
    data class TaskOfferPair(
        val task: TaskDTO,
        val offer: ViewOfferDTO
    )

    return client.getTaskOffers(id).map {
            it.toViewOfferDTO()
        }.zipWith(client.getTask(id), BiFunction {
            offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
        }).filter {
            it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
        }.map {
            it.offer
        }
}
  • getTaskOffers returns a Flux of OfferDTO
  • getTask returns a Mono of TaskDTO

If you cannot answer my question please tell me atleast how to do multiple API calls in parallel and wait for the results in WebClient


Solution

  • Here is a use case for a parallel call.

    public Mono<UserInfo> fetchCarrierUserInfo(User user) {
            Mono<UserInfo> userInfoMono = fetchUserInfo(user.getGuid());
            Mono<CarrierInfo> carrierInfoMono = fetchCarrierInfo(user.getCarrierGuid());
    
            return Mono.zip(userInfoMono, carrierInfoMono).map(tuple -> {
                UserInfo userInfo = tuple.getT1();
                userInfo.setCarrier(tuple.getT2());
                return userInfo;
            });
        }
    

    Here:

    • fetchUserInfo makes http call to get user info from another service and returns Mono
    • fetchCarrierInfo method makes HTTP call to get carrierInfo from another service and returns Mono
    • Mono.zip() merges given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple2.

    Then, call fetchCarrierUserInfo().block() it to get the final result.