Search code examples
javaspringreactive-programmingspring-webflux

Chained reactive components invocation


I have a list of following objects with method returning reactive type Mono<?>:

interface GuyWithReactiveReturnTypeMethod {

    Mono<String> execute();
}

class ReactiveGuysInvocator {

    Mono<String> executeAllGuys(List<GuyWithReactiveReturnTypeMethod> guysToInvoke) {
        ???
    }
}

And I need to invoke all the guys one by one (n's guy result is n+1's guy argument), but I'm not sure how can I iterate over such list. I thought of flatMaping next guy in a while loop:

public interface GuyWithReactiveReturnTypeMethod {

    Mono<String> execute(String string);
}

class ReactiveGuysInvocator {

    Mono<String> executeAllGuys(List<GuyWithReactiveReturnTypeMethod> guysToExecute) {
        ListIterator<GuyWithReactiveReturnTypeMethod> iterator = guysToExecute.listIterator();
        Mono<String> currentResult = Mono.just("start");
        while (iterator.hasNext()) {
            GuyWithReactiveReturnTypeMethod guyToInvoke = iterator.next();
            currentResult = currentResult.flatMap(guyToInvoke::execute)
                    .doOnNext(object -> System.out.println("Executed!"))
                    .doOnError(error -> System.out.println("Error"));
        }
        return currentResult;
    }
}

But this approach seems to be completely incorrect. Does anyone know how could I implement something like this?


Solution

  • UPDATE: flatMap can be easily abused. Make sure that you are doing asynchronous work when using flatMap. Mostly, it seems to me, that you can do pretty well with a minimum of Mono.just.

    Flatmap is what you have to do with the constraints you provide.

    executeAllGuys(Arrays.asList(new GuyWithReactiveReturnTypeMethod[] {
            (s)->Mono.just(s+"1"), 
            (s)->Mono.just(s+"2"), 
            (s)->Mono.just(s+"3")})) 
    .subscribe(System.out::println);
    
    Mono<String> executeAllGuys(List<GuyWithReactiveReturnTypeMethod> guysToExecute) {
        // your flow is starting here
        Mono<String> stringMono = Mono.just("start");
        for ( GuyWithReactiveReturnTypeMethod guyToInvoke: guysToExecute) {
            stringMono = stringMono.flatMap(guyToInvoke::execute);
        }
        return stringMono;
    }
    

    Just look at all those Mono.just calls. Why do you want to create N+1 flows to do the job? The real problem is you're creating a new flow every time you execute the interface method. Flatmap stops the current flow and starts a new one with the publisher returned by the flatMap method. Try to think reactive and treat the whole business like a stream. There is no flatMap in Streams. A reactive execution should be done on only a single flow.

    A Mono<String> execute(String string) is not a reactive component. It is a reactive producer. A Mono<String> execute(Mono<String> string) is a reactive component.

    1. Make your interface more reactive by taking a Mono in and returning a Mono. Your application is doing a map conversion on at each step. This is "chaining reactive components".

      executeAllGuys(Arrays.asList(new GuyWithReactiveReturnTypeMethod[] { (s)->s.map(str->str+"1"), (s)->s.map(str->str+"2"), (s)->s.map(str->str+"3")})) .subscribe(System.out::println);

      Mono executeAllGuys(List guysToExecute) { // your flow is starting here Mono stringMono = Mono.just("start"); for ( GuyWithReactiveReturnTypeMethod guyToInvoke: guysToExecute) { stringMono = guyToInvoke.execute(stringMono); } return stringMono; }

      interface GuyWithReactiveReturnTypeMethod { Mono execute(Mono string); }

    2. Make your interface less reactive but make your application more reactive by using a Flux instead of a list. You will then have to use reduce to convert a Flux to a Mono. Your application is doing a Map/Reduce function. I don't think a Flux will guarantee execution order of the elements in the flow but it could executeAllGuys more efficiently.

      // your flow is starting here executeAllGuys(Flux.just( (s)->s+"1", (s)->s+"2", (s)->s+"3")) .subscribe(System.out::println);

      Mono executeAllGuys(Flux guysToExecute) { return guysToExecute.reduce("start", (str, guyToInvoke)->guyToInvoke.execute(str)); } interface GuyWithReactiveReturnTypeMethod { String execute(String string); }

    Reference: Reactive Programming: Spring WebFlux: How to build a chain of micro-service calls?