Search code examples
publish-subscribespring-webfluxpublisherreactive-streams

Why is Subscriber requesting different number of elements in different cases?


I am learning reactive streams and publish-subcribe utility and i am using the default behaviour of Publisher(Flux in my case) and Subscriber.

I have two scenarios, both having the same number of elements in Flux. But when i analyse the logs, the onSubscribe method is requesting for different numbers of elements (say in one case it is request for unbounded elements and in other case it is requesting 32 elements).

Here are the two cases and the logs:

        System.out.println("*********Calling MapData************");
        List<Integer> elements = new ArrayList<>();
        Flux.just(1, 2, 3, 4)
          .log()
          .map(i -> i * 2)
          .subscribe(elements::add);
        //printElements(elements);
        System.out.println("-------------------------------------");

        System.out.println("Inside Combine Streams");
        List<Integer> elems = new ArrayList<>();
        Flux.just(10,20,30,40)
            .log()
            .map(x -> x * 2)
            .zipWith(Flux.range(0, Integer.MAX_VALUE),
                (two, one) -> String.format("First  : %d, Second : %d \n", one, two))
            .subscribe(new Consumer<String>() {
              @Override
              public void accept(String s) {

              }
            });
        System.out.println("-------------------------------------");

and here are the logs:

*********Calling MapData************
[warn] LoggerFactory has not been explicitly initialized. Default system-logger will be used. Please invoke StaticLoggerBinder#setLog(org.apache.maven.plugin.logging.Log) with Mojo's Log instance at the early start of your Mojo
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(unbounded)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------
Inside Combine Streams
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(32)
[info] | onNext(10)
[info] | onNext(20)
[info] | onNext(30)
[info] | onNext(40)
[info] | onComplete()
[info] | cancel()
-------------------------------------

As i have not used any customised subscriber implementation, then why in "MapData" case, it is logging "[info] | request(unbounded)" and in ""Inside Combine Streams"" case it is logging "[info] | request(32)" ?

Please suggest.


Solution

  • First, you should know that this is the expected behavior.

    Depending on the operators you're using, Reactor will apply different prefetching strategies:

    • some operators will use default values like 32 or 256
    • some arrangements will use the value you provided if you added a buffering operator with a specific value
    • Reactor can guess that the stream of values is finite, and will request an unbounded value

    You can always change this behavior if you use operators variants with the int prefetch method argument, or if you implement your own Subscriber using BaseSubscriber (which provides several useful methods for that).

    The bottom line is that you often don't need to pay attention to that particular value; it can only be useful if you want to optimize that prefetch strategy for a particular data source.