I am new to Reactive programming and I would like to make two API calls in parallel and process the results and return a simple array or list of items.
I have two functions, one returns a Flux and the other returns a Mono and I make a very simple filtering logic on the Flux emitted items depending on the result of that Mono.
I tried to use zipWith
but only one item made it to the end no matter what filtering logic. Also I tried with block
but that is not allowed inside the controller :/
@GetMapping("/{id}/offers")
fun viewTaskOffers(
@PathVariable("id") id: String,
@AuthenticationPrincipal user: UserPrincipal
) : Flux<ViewOfferDTO> {
data class TaskOfferPair(
val task: TaskDTO,
val offer: ViewOfferDTO
)
return client.getTaskOffers(id).map {
it.toViewOfferDTO()
}.zipWith(client.getTask(id), BiFunction {
offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
}).filter {
it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
}.map {
it.offer
}
}
getTaskOffers
returns a Flux of OfferDTO
getTask
returns a Mono of TaskDTO
If you cannot answer my question please tell me atleast how to do multiple API calls in parallel and wait for the results in WebClient
Here is a use case for a parallel call.
public Mono<UserInfo> fetchCarrierUserInfo(User user) {
Mono<UserInfo> userInfoMono = fetchUserInfo(user.getGuid());
Mono<CarrierInfo> carrierInfoMono = fetchCarrierInfo(user.getCarrierGuid());
return Mono.zip(userInfoMono, carrierInfoMono).map(tuple -> {
UserInfo userInfo = tuple.getT1();
userInfo.setCarrier(tuple.getT2());
return userInfo;
});
}
Here:
fetchUserInfo
makes http call to get user info from another service and returns Mono
fetchCarrierInfo
method makes HTTP call to get carrierInfo from another service and returns Mono
Mono.zip()
merges given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values into a Tuple2. Then, call fetchCarrierUserInfo().block()
it to get the final result.