I'm currently working on using paralellism in a Flux. Right now I'm having problems with the backpressure. In our case we have a fast producing service we want to consume, but we are much slower. With a normal flux, this works so far, but we want to have parallelism. What I see when I'm using the approach with
.parallel(2)
.runOn(Schedulers.parallel())
that there is a big request on the beginning, which takes quite a long time to process. Here occurs also a different problem, that if we take too long to process, we somehow seem to generate a cancel event in the producer service (we consume it via webflux rest-call), but no cancel event is seen in the consumer.
But back to problem 1, how is it possible to bring this thing back to sync. I know of the prefetch parameter on the .parallel()
method, but it does not work as I expect.
A minimum example would be something like this
fun main() {
val atomicInteger = AtomicInteger(0)
val receivedCount = AtomicInteger(0)
val processedCount = AtomicInteger(0)
Flux.generate<Int> {
it.next(atomicInteger.getAndIncrement())
println("Emitted ${atomicInteger.get()}")
}.doOnEach { it.get()?.let { receivedCount.addAndGet(1) } }
.parallel(2, 1)
.runOn(Schedulers.parallel())
.flatMap {
Thread.sleep(200)
log("Work on $it")
processedCount.addAndGet(1)
Mono.just(it * 2)
}.subscribe {
log("Received ${receivedCount.get()} and processed ${processedCount.get()}")
}
Thread.sleep(25000)
}
where I can observe logs like this
...
Emitted 509
Emitted 510
Emitted 511
Emitted 512
Emitted 513
2022-02-02T14:12:58.164465Z - Thread[parallel-1,5,main] Work on 0
2022-02-02T14:12:58.168469Z - Thread[parallel-2,5,main] Work on 1
2022-02-02T14:12:58.241966Z - Thread[parallel-1,5,main] Received 513 and processed 2
2022-02-02T14:12:58.241980Z - Thread[parallel-2,5,main] Received 513 and processed 2
2022-02-02T14:12:58.442218Z - Thread[parallel-2,5,main] Work on 3
2022-02-02T14:12:58.442215Z - Thread[parallel-1,5,main] Work on 2
2022-02-02T14:12:58.442315Z - Thread[parallel-2,5,main] Received 513 and processed 3
2022-02-02T14:12:58.442338Z - Thread[parallel-1,5,main] Received 513 and processed 4
So how could I adjust that thing that I can use the parallelism but stay in backpressure/sync with my producer? The only way I got it to work is with a semaphore acquired before the parallelFlux and released after work, but this is not really a nice solution.
Ok for this szenario it seemed crucial that prefetch of parallel and runOn had to bet seen very low, here to 1. With defaults from 256, we requested too much from our producer, so that there was already a cancel event because of the long time between the first block of requests for getting the prefetch and the next one when the Flux decided to fill the buffer again.