I have a fast, but expensive producer (Spring WebClient) and a very slow subscriber. I need a way to honor backpressure throughout the chain.
During implementation I realized that flatMap, concatMap and others use eager fetching and there seems to be no possibility to disable this behavior.
Using demand in the subscriber without flatMap
Flux.defer(() -> Flux.range(1, 1000))
.doOnRequest(i -> System.out.println("Requested: " + i))
.doOnNext(v -> System.out.println("Emitted: " + v))
//.flatMap(Mono::just)
.subscribe(new BaseSubscriber<Object>() {
protected void hookOnSubscribe(final Subscription subscription) {
subscription.request(3);
}
protected void hookOnNext(final Object value) {
System.out.println("Received: " + value);
}
});
.. produces:
Requested: 3
Emitted: 1
Received: 1
Emitted: 2
Received: 2
Emitted: 3
Received: 3
Using same demand with flatMap (uncommented) produces:
Requested: 256
Emitted: 1
Received: 1
Emitted: 2
Received: 2
Emitted: 3
Received: 3
Emitted: 4
Emitted: 5
...
Emitted: 254
Emitted: 255
Emitted: 256
It seems there is an open issue for this: https://github.com/reactor/reactor-core/issues/1397
Anyway, I found a solution for my situation: block()
. Keep in mind that this operation is only allowed on Threads which are not marked as "non-blocking operations only". (See also Project Blockhound)
To recap, the issue is that at some point I have a Flux<Mono<T>>
and .flatMap(...)
, .concatMap(...)
, etc. use some kind of eager fetching. The Flux<Mono<T>>
used for testing:
final Flux<Mono<Integer>> monoFlux = Flux.<Mono<Integer>, Integer>generate(
() -> 0,
(state, sink) -> {
state += 1;
sink.next(Mono.just(state));
return state;
}).doOnRequest(i -> System.out.println("Requested: " + i))
.doOnNext(v -> System.out.println("Emitted: " + v));
In order to not have eager fetching, I now do a block inside a map and it works surprisingly well:
monoFlux.map(Mono::block)
.subscribe(new MySubscriber<>());
Result:
Requested: 3
Emitted: MonoJust
Received: 1
Emitted: MonoJust
Received: 2
Emitted: MonoJust
Received: 3