I'm playing around with the Flow API and, so far, I understand that the request()
method is used for back pressure. Most articles state that this is akin to controlling the speed of consumption.
However, almost every example code I see passes the value 1
into the request()
method, like, subscription.request(1)
. But I don't quite understand how does the request()
method control the speed of consumption.
I have tried to run a test by sending a bunch of items to the publisher and print the thread name and it seems like every single onNext()
is running on the same worker thread be it I was using request(1)
or request(50)
:
@Override
public void onNext(T item) {
System.out.println(Thread.getCurrent().getName());
Thread.sleep(5000);
subscription.request(50);
}
If the onNext()
were running in different threads, I can understand how that the n
value passed into request(n)
will affect the rate at which the items are being processed in parallel (running in n
threads). But it doesn't seem to be the case in my test as they are all running under the same thread name.
In this case, what's the difference between request(1)
and request(50)
when they are all still going to be run sequentially one after another on the same thread? Wouldn't the consumption rate still be the same then?
The n
in request
indicates how many elements the subscriber can accept and gives a limit on how many items the upstream Publisher
can emit. Therefore, the slowing down of this generator is not per individual item but the average time for each batch generated interleaved by the consumer's processing time.
onNext
is executed in a serialized fashion and depending on the upstream, on the same thread as well. Thus, calling request
in there usually indicates the upstream it can call the same onNext
, after the current call has ended, with the next value if available. I.e., calling Thread.sleep
will postpone this next invocation of onNext
.
Generally, there is no reason to call request
in onNext
an end-Subscriber because it is run synchronously with respect to its immediate upstream Publisher
and there is no practical difference between a single request(Long.MAX_VALUE)
and repeated request(1)
.
One of the few reasons to call request
if the onNext
forks off asynchronous work itself and only at the end of that work should more items be requested:
Executor executor = ...
Subscription upstream;
@Override public void onSubscribe(Subscription s) {
this.upstream = s;
executor.execute(() -> {
Thread.sleep(5000);
s.request(1);
return null; // Callable
});
}
@Override public void onNext(T item) {
System.out.println("Start onNext");
executor.execute(() -> {
System.out.println("Run work");
Thread.sleep(5000);
System.out.println("Request more work");
upstream.request(1);
return null; // Callable
});
System.out.println("End onNext");
}
With this setup, the upstream will call onNext
once and will call it only when the task executed by the executor has issued the next request. Note though that unless the Publisher
emits from a dedicated thread, the example above will eventually drag the onNext
invocation onto the executor
's thread.