Search code examples
javareactive-programmingproject-reactorspring-webclient

Java Reactor: Is there a way to transform Flux<Mono<T>> into Flux<T> without eager fetching?


I have a fast, but expensive producer (Spring WebClient) and a very slow subscriber. I need a way to honor backpressure throughout the chain.

During implementation I realized that flatMap, concatMap and others use eager fetching and there seems to be no possibility to disable this behavior.

Using demand in the subscriber without flatMap

Flux.defer(() -> Flux.range(1, 1000))
            .doOnRequest(i -> System.out.println("Requested: " + i))
            .doOnNext(v -> System.out.println("Emitted:   " + v))
            //.flatMap(Mono::just)
            .subscribe(new BaseSubscriber<Object>() {
                protected void hookOnSubscribe(final Subscription subscription) {
                    subscription.request(3);
                }

                protected void hookOnNext(final Object value) {
                    System.out.println("Received:  " + value);
                }
            });

.. produces:

Requested: 3
Emitted:   1
Received:  1
Emitted:   2
Received:  2
Emitted:   3
Received:  3

Using same demand with flatMap (uncommented) produces:

Requested: 256
Emitted:   1
Received:  1
Emitted:   2
Received:  2
Emitted:   3
Received:  3
Emitted:   4
Emitted:   5
...
Emitted:   254
Emitted:   255
Emitted:   256

Solution

  • It seems there is an open issue for this: https://github.com/reactor/reactor-core/issues/1397

    Anyway, I found a solution for my situation: block(). Keep in mind that this operation is only allowed on Threads which are not marked as "non-blocking operations only". (See also Project Blockhound)

    To recap, the issue is that at some point I have a Flux<Mono<T>> and .flatMap(...), .concatMap(...), etc. use some kind of eager fetching. The Flux<Mono<T>> used for testing:

    final Flux<Mono<Integer>> monoFlux = Flux.<Mono<Integer>, Integer>generate(
    () -> 0, 
    (state, sink) -> {
        state += 1;
        sink.next(Mono.just(state));
        return state;
    }).doOnRequest(i -> System.out.println("Requested: " + i))
      .doOnNext(v -> System.out.println("Emitted:   " + v));
    

    In order to not have eager fetching, I now do a block inside a map and it works surprisingly well:

    monoFlux.map(Mono::block)
            .subscribe(new MySubscriber<>());
    

    Result:

    Requested: 3
    Emitted:   MonoJust
    Received:  1
    Emitted:   MonoJust
    Received:  2
    Emitted:   MonoJust
    Received:  3