I am learning reactive streams and publish-subcribe utility and i am using the default behaviour of Publisher(Flux in my case) and Subscriber.
I have two scenarios, both having the same number of elements in Flux. But when i analyse the logs, the onSubscribe method is requesting for different numbers of elements (say in one case it is request for unbounded elements and in other case it is requesting 32 elements).
Here are the two cases and the logs:
System.out.println("*********Calling MapData************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.log()
.map(i -> i * 2)
.subscribe(elements::add);
//printElements(elements);
System.out.println("-------------------------------------");
System.out.println("Inside Combine Streams");
List<Integer> elems = new ArrayList<>();
Flux.just(10,20,30,40)
.log()
.map(x -> x * 2)
.zipWith(Flux.range(0, Integer.MAX_VALUE),
(two, one) -> String.format("First : %d, Second : %d \n", one, two))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
}
});
System.out.println("-------------------------------------");
and here are the logs:
*********Calling MapData************
[warn] LoggerFactory has not been explicitly initialized. Default system-logger will be used. Please invoke StaticLoggerBinder#setLog(org.apache.maven.plugin.logging.Log) with Mojo's Log instance at the early start of your Mojo
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(unbounded)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Inside Combine Streams
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(32)
[info] | onNext(10)
[info] | onNext(20)
[info] | onNext(30)
[info] | onNext(40)
[info] | onComplete()
[info] | cancel()
-------------------------------------
As i have not used any customised subscriber implementation, then why in "MapData" case, it is logging "[info] | request(unbounded)" and in ""Inside Combine Streams"" case it is logging "[info] | request(32)" ?
Please suggest.
First, you should know that this is the expected behavior.
Depending on the operators you're using, Reactor will apply different prefetching strategies:
32
or 256
You can always change this behavior if you use operators variants with the int prefetch
method argument, or if you implement your own Subscriber
using BaseSubscriber
(which provides several useful methods for that).
The bottom line is that you often don't need to pay attention to that particular value; it can only be useful if you want to optimize that prefetch strategy for a particular data source.