Using non-blocking calls I want to generically take a Mono and call a method that returns a Flux and for each item in the Flux, call a method that returns Mono to return a Flux which is a an aggregate object of Bar + Foo + Bar and has as many elements as the Flux method returns (will return).
As a concrete example: Methods:
Flux<Bar> getBarsByFoo(Foo foo);
Mono<More> getMoreByBar(Bar bar);
Combined getCombinedFrom(Bar bar, Foo foo, More more);
Working code section:
Flux<Combined> getCombinedByFoo(Foo foo) {
getBarsByFoo(foo)...
}
From a blocking perspective what I want to accomplish is:
List<Combined> getCombinedByFoo(Foo foo) {
List<Bar> bars = getBarsByFoo(foo):
List<Combined> combinedList = new ArrayList<>(bars.size());
for (Bar bar: bars) {
More more = getMoreByBar(bar);
combinedList.append(getCombinedFrom(bar, foo, more));
}
return combinedList;
}
Any help on which Flux and Mono methods to use would be appreciated. I am still learning to change my brain into non-blocking thinking. Conceptually, I think there is a function to apply to each element (Bar) in from getBarsByFoo(Foo foo) to somehow map that to the combined element...
I like to think about Reactor programming as a flow of operations (as in flow programming), as a chain/DAG of operation.
In your case, you want to:
The following example show a reactive version of your method (for a less verbose version, see Toerktumlare answer:
Flux<Combined> combine(Foo foo) {
Flux<Bar> bars = getBarBy(foo);
Flux<Combined> result = bars.flatMap(bar -> {
Mono<More> nextMore = getMoreBy(bar);
Mono<Combined> next = nextMore.map(more -> getCombinedFrom(foo, bar, more));
return next;
);
return result;
}
If you get your foo object through a Mono, you can just call flatMapMany
on it:
Mono<Foo> nextFoo = ...;
Flux<Combined> = nextFoo.flatMapMany(foo -> combine(foo));
flatMap is very powerful: it can trigger concurrent execution of the provided operation. In your case, it means that many getMoreBy(bar)
operations can be launched at the same time. But it is a double-edged sword, because then it means that:
The concurrency behavior is quite high by default (256) and can be controlled in different ways: