I have a list of following objects with method returning reactive type Mono<?>
:
interface GuyWithReactiveReturnTypeMethod {
Mono<String> execute();
}
class ReactiveGuysInvocator {
Mono<String> executeAllGuys(List<GuyWithReactiveReturnTypeMethod> guysToInvoke) {
???
}
}
And I need to invoke all the guys one by one (n
's guy result is n+1
's guy argument), but I'm not sure how can I iterate over such list.
I thought of flatMap
ing next guy in a while loop:
public interface GuyWithReactiveReturnTypeMethod {
Mono<String> execute(String string);
}
class ReactiveGuysInvocator {
Mono<String> executeAllGuys(List<GuyWithReactiveReturnTypeMethod> guysToExecute) {
ListIterator<GuyWithReactiveReturnTypeMethod> iterator = guysToExecute.listIterator();
Mono<String> currentResult = Mono.just("start");
while (iterator.hasNext()) {
GuyWithReactiveReturnTypeMethod guyToInvoke = iterator.next();
currentResult = currentResult.flatMap(guyToInvoke::execute)
.doOnNext(object -> System.out.println("Executed!"))
.doOnError(error -> System.out.println("Error"));
}
return currentResult;
}
}
But this approach seems to be completely incorrect. Does anyone know how could I implement something like this?
UPDATE: flatMap
can be easily abused. Make sure that you are doing asynchronous
work when using flatMap
. Mostly, it seems to me, that you can do pretty well with a minimum of Mono.just
.
Flatmap
is what you have to do with the constraints you provide.
executeAllGuys(Arrays.asList(new GuyWithReactiveReturnTypeMethod[] {
(s)->Mono.just(s+"1"),
(s)->Mono.just(s+"2"),
(s)->Mono.just(s+"3")}))
.subscribe(System.out::println);
Mono<String> executeAllGuys(List<GuyWithReactiveReturnTypeMethod> guysToExecute) {
// your flow is starting here
Mono<String> stringMono = Mono.just("start");
for ( GuyWithReactiveReturnTypeMethod guyToInvoke: guysToExecute) {
stringMono = stringMono.flatMap(guyToInvoke::execute);
}
return stringMono;
}
Just look at all those Mono.just
calls. Why do you want to create N+1 flows to do the job? The real problem is you're creating a new flow every time you execute the interface method. Flatmap
stops the current flow and starts a new one with the publisher returned by the flatMap
method. Try to think reactive
and treat the whole business like a stream. There is no flatMap
in Streams
. A reactive execution should be done on only a single flow.
A Mono<String> execute(String string)
is not a reactive component. It is a reactive producer. A Mono<String> execute(Mono<String> string)
is a reactive component.
Make your interface more reactive by taking a Mono
in and returning a Mono
. Your application is doing a map
conversion on at each step. This is "chaining reactive components".
executeAllGuys(Arrays.asList(new GuyWithReactiveReturnTypeMethod[] { (s)->s.map(str->str+"1"), (s)->s.map(str->str+"2"), (s)->s.map(str->str+"3")})) .subscribe(System.out::println);
Mono executeAllGuys(List guysToExecute) { // your flow is starting here Mono stringMono = Mono.just("start"); for ( GuyWithReactiveReturnTypeMethod guyToInvoke: guysToExecute) { stringMono = guyToInvoke.execute(stringMono); } return stringMono; }
interface GuyWithReactiveReturnTypeMethod { Mono execute(Mono string); }
Make your interface less reactive but make your application more reactive by using a Flux
instead of a list. You will then have to use reduce
to convert a Flux to a Mono. Your application is doing a Map/Reduce function. I don't think a Flux
will guarantee execution order of the elements in the flow but it could executeAllGuys more efficiently.
// your flow is starting here executeAllGuys(Flux.just( (s)->s+"1", (s)->s+"2", (s)->s+"3")) .subscribe(System.out::println);
Mono executeAllGuys(Flux guysToExecute) { return guysToExecute.reduce("start", (str, guyToInvoke)->guyToInvoke.execute(str)); } interface GuyWithReactiveReturnTypeMethod { String execute(String string); }
Reference: Reactive Programming: Spring WebFlux: How to build a chain of micro-service calls?