I wrote this code:
Flux.range(0, 300)
.doOnNext(i -> System.out.println("i = " + i))
.flatMap(i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.delayElement(Duration.ofMillis(1000))
)
.doOnNext(i -> System.out.println("end " + i))
.blockLast();
When running it, the first System.out.println
shows that the Flux stop emitting numbers at the 256th element, then it waits for the older to be completed before emitting new ones.
Why is this happening?
Why 256?
The flatMap
operator can be characterized as operator that (rephrased from javadoc):
For this question the first point is important. Project Reactor restricts the
number of in-flight inner sequences via concurrency
parameter.
While flatMap(mapper)
uses the default parameter the flatMap(mapper, concurrency)
overload accepts this parameter explicitly.
The flatMap
s javadoc describes the parameter as:
The concurrency argument allows to control how many Publisher can be subscribed to and merged in parallel
Consider the following code using concurrency
= 500
Flux.range(0, 300)
.doOnNext(i -> System.out.println("i = " + i))
.flatMap(i -> Mono.just(i)
.subscribeOn(Schedulers.elastic())
.delayElement(Duration.ofMillis(1000)),
500
// ^^^^^^^^^^
)
.doOnNext(i -> System.out.println("end " + i))
.blockLast();
In this case there is no waiting:
i = 297
i = 298
i = 299
end 0
end 1
end 2
In contrast if you pass 1
as concurrency
the output will be similar to:
i = 0
end 0
i = 1
end 1
Awaiting one second before emitting the next element.
256 is the default value for concurrency of flatMap
.
Take a look at Queues.SMALL_BUFFER_SIZE
:
public static final int SMALL_BUFFER_SIZE = Math.max(16,
Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));