Search code examples
spring-webfluxproject-reactorreactive-streams

Combining result from Flux to result from Mono


I started using Project reactor and one place where I'm struggling little is how do I combine things coming from Mono with Flux. Here's my use case:

public interface GroupRepository {
       Mono<GroupModel> getGroup(Long groupId);
}

public interface UserRepository {
       Flux<User> getUsers(Set<Long> userIds);   
}

Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.

Now what I want to achieve is:

How can I combine response from UserFlux and associate those users with that group, with something like group.addUsers(userfromFlux).

Can someone help with how to combine results coming from userFlux and groupMono. I think I use something like Zip but then it does one to one mapping from source. In my case, I need to do 1 to N mapping. Here I have one group but multiple user that I need to add to that group. Is it a good idea to return Mono<List<Users> and then use zip operator with both mono and provide a combinator as mentioned here
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)?


Solution

  • I think Flux.combineLatest static method could help you there: since your Mono only ever emits 1 element, that element will always be the one combined with each incoming value from the Flux.

    Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
                       groupMono, userFlux);