Search code examples
javaproject-reactorreactive-streams

Why do some Reactor operators request far more elements than they are interested in?


I have the following code:

Flux<String> flux = Flux.<String>never()
        .doOnRequest(n -> System.out.println("Requested " + n));

It is a Flux that never emits any signal, but reports demand to the console.

Each of the following 3 lines

flux.take(3).next().block();
flux.next().block();
flux.blockFirst();

produces this output:

Requested 9223372036854775807

Looking at the code, I see the following.

BlockingSingleSubscriber (works both in the cases of Flux#blockFirst() and Mono#block():

public final void onSubscribe(Subscription s) {
    this.s = s;
    if (!cancelled) {
        s.request(Long.MAX_VALUE);
    }
}

MonoNext.NextSubscriber:

public void request(long n) {
    if (WIP.compareAndSet(this, 0, 1)) {
        s.request(Long.MAX_VALUE);
    }
}

FluxTake.TakeSubscriber:

public void request(long n) {
    if (wip == 0 && WIP.compareAndSet(this, 0, 1)) {
        if (n >= this.n) {
            s.request(Long.MAX_VALUE);
        } else {
            s.request(n);
        }
        return;
    }

    s.request(n);
}

So Flux#blockFirst(), Flux#next() and Mono#block() always signal an unbounded demand to their upstream, and Flux#take() can do the same under some circumstances.

But Flux#blockFirst(), Flux#next() and Mono#block() each need at max one element from their upstream, and Flux#take() needs maximally this.n.

Also, Flux#take() javadoc says the following:

Note that this operator doesn't manipulate the backpressure requested amount. Rather, it merely lets requests from downstream propagate as is and cancels once N elements have been emitted. As a result, the source could produce a lot of extraneous elements in the meantime. If that behavior is undesirable and you do not own the request from downstream (e.g. prefetching operators), consider using {@link #limitRequest(long)} instead.

The question is: why do they signal an unbounded demand when they know the limit upfront? I had an impression that reactive backpressure was about only asking for what you are ready to consume. But in reality, it often works like this: shout 'produce all you can' to the upstream, and then cancel the subscription once satisfied. In cases when it is costly to produce gazillion records upstream this seems simply wasteful.


Solution

  • tl;dr - Requesting only what you need is usually ideal in a pull based system, but is very rarely ideal in a push based system.

    I had an impression that reactive backpressure was about only asking for what you are ready to consume.

    Not quite, it's what you are able to consume. The difference is subtle, but important.

    In a pull based system, you'd be entirely correct - requesting more values than you know you'll ever need would almost never be a good thing, as the more values you request, the more work needs to happen to produce those values.

    But note that reactive streams are inherently push based, not pull based. Most reactive frameworks, reactor included, are built with this in mind - and while hybrid, or pull based semantics are possible (using Flux.generate() to produce elements one at a time on demand for example) this is very much a secondary use case. The norm is to have a publisher that has a bunch of data it needs to offload, and it "wants" to push that to you as quickly as possible to be rid of it.

    This is important as it flips the view as to what's ideal from a requesting perspective. It no longer becomes a question of "What's the most I'll ever need", but instead "What's the most I can ever deal with" - the bigger the number, the better.

    As an example, let's say I have a database query returning 2000 records connected to a flux - but I only want 1. If I have a publisher that's pushing these 2000 records, and I call request(1), then I'm not "helping" things at all - I haven't caused less processing on the database side, those records are already there and waiting. Since I've only requested 1 however, the publisher must then decide whether it can buffer the remaining records, or it's best to skip some or all of them, or it should throw an exception if it can't keep up, or something else entirely. Whatever it does, I'm actually causing more work, and possibly even an exception in some cases, by requesting fewer records.

    Granted, this is not always desirable - perhaps those extra elements in the Flux really do cause extra processing that's wasteful, perhaps network bandwidth is a primary concern, etc. In that case, you'd want to explicitly call limitRequest(). In most cases though, that's probably not the behaviour you're after.

    (For completeness sake, the best scenario is of course to limit the data at source - put a LIMIT 1 on your database query if you only want a single value for instance. Then you don't have to worry about any of this stuff. But, of course, in real-world usage that's not always possible.)