Search code examples
javaproject-reactorflux

Flux.range waits to emit more element once 256 elements are reached


I wrote this code:

Flux.range(0, 300)
            .doOnNext(i -> System.out.println("i = " + i))
            .flatMap(i -> Mono.just(i)
                            .subscribeOn(Schedulers.elastic())
                            .delayElement(Duration.ofMillis(1000))
            )
            .doOnNext(i -> System.out.println("end " + i))
            .blockLast();

When running it, the first System.out.println shows that the Flux stop emitting numbers at the 256th element, then it waits for the older to be completed before emitting new ones.

Why is this happening?
Why 256?


Solution

  • Why this happening?

    The flatMap operator can be characterized as operator that (rephrased from javadoc):

    1. subscribes to its inners eagerly
    2. does not preserve ordering of elements.
    3. lets values from different inners interleave.

    For this question the first point is important. Project Reactor restricts the number of in-flight inner sequences via concurrency parameter.

    While flatMap(mapper) uses the default parameter the flatMap(mapper, concurrency) overload accepts this parameter explicitly.

    The flatMaps javadoc describes the parameter as:

    The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel

    Consider the following code using concurrency = 500

    Flux.range(0, 300)
            .doOnNext(i -> System.out.println("i = " + i))
            .flatMap(i -> Mono.just(i)
                            .subscribeOn(Schedulers.elastic())
                            .delayElement(Duration.ofMillis(1000)),
                    500
    //         ^^^^^^^^^^
            )
            .doOnNext(i -> System.out.println("end " + i))
            .blockLast();
    

    In this case there is no waiting:

    i = 297
    i = 298
    i = 299
    end 0
    end 1
    end 2
    

    In contrast if you pass 1 as concurrency the output will be similar to:

    i = 0
    end 0
    i = 1
    end 1
    

    Awaiting one second before emitting the next element.

    Why 256?

    256 is the default value for concurrency of flatMap.

    Take a look at Queues.SMALL_BUFFER_SIZE:

    public static final int SMALL_BUFFER_SIZE = Math.max(16,
            Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));