I am seeing different behaviors in the following two pipelines where I would expect similar/same. Intent is to processes the items with concurrency level of x (4 in the examples), and not block any items from processing in the 'buffer'.
I have recreated the scenario here to play with using a Flowable.range(1, 1280)
as the source, and simulating some 'slow processing' on item 2
only which will block for 5 seconds.
Flowable.range(1, 1280) // cold flowable, items are produced 'on-demand'
.doOnNext { logEvent("produced $it") }
.parallel(4, 1) //parallism is 4, prefetch is 1
.runOn(Schedulers.computation(), 1) //again use prefetch of 1
.doOnNext(::process)
.sequential()
.doOnNext { logEvent("done with $it") }
.ignoreElements()
.blockingAwait()
For this I get output like:
...
2021-04-12T12:15:49.147 - [main] produced 4
2021-04-12T12:15:49.147 - [main] produced 5
2021-04-12T12:15:49.147 - [RxComputationThreadPool-2] slow processing 2
2021-04-12T12:15:49.147 - [RxComputationThreadPool-4] fast processing 4
...
2021-04-12T12:15:49.170 - [RxComputationThreadPool-1] fast processing 1278
2021-04-12T12:15:49.170 - [RxComputationThreadPool-1] done with 1278
2021-04-12T12:15:54.147 - [RxComputationThreadPool-2] slow processing 2 done
2021-04-12T12:15:54.147 - [RxComputationThreadPool-2] done with 2
The breakdown of number of items processed by each thread in this case is this:
RxComputationThreadPool-2: 1
RxComputationThreadPool-4: 429
RxComputationThreadPool-3: 416
RxComputationThreadPool-1: 434
Also note the time in the logs, all items except for 2 are processed within 1 second, and item 2 finishes after 5 seconds as expected.
Now I expect to achieve similar behavior with this flatMap approach:
Flowable.range(1, 1280)
.doOnNext { logEvent("produced $it") }
.flatMapSingle({ Single.fromCallable { process(it); it }.subscribeOn(Schedulers.computation()) },
true, 4) // delayErrors (true or false doesn't matter), and maxConcurrency
.doOnNext { logEvent("done with $it") }
.ignoreElements()
.blockingAwait()
However I get output like this:
...
2021-04-12T12:29:24.452 - [main] produced 4
2021-04-12T12:29:24.454 - [RxComputationThreadPool-1] fast processing 1
2021-04-12T12:29:24.454 - [RxComputationThreadPool-3] fast processing 3
2021-04-12T12:29:24.455 - [RxComputationThreadPool-1] done with 1
2021-04-12T12:29:24.455 - [RxComputationThreadPool-2] slow processing 2
2021-04-12T12:29:24.455 - [RxComputationThreadPool-1] produced 5
...
2021-04-12T12:29:24.458 - [RxComputationThreadPool-8] produced 25
2021-04-12T12:29:24.459 - [RxComputationThreadPool-1] fast processing 25
2021-04-12T12:29:24.459 - [RxComputationThreadPool-1] done with 25
2021-04-12T12:29:24.459 - [RxComputationThreadPool-1] produced 26
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] slow processing 2 done
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] done with 2
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] produced 27
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] fast processing 10
...
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] produced 1280
2021-04-12T12:29:29.477 - [RxComputationThreadPool-7] fast processing 1279
2021-04-12T12:29:29.477 - [RxComputationThreadPool-8] fast processing 1280
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] done with 1278
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] done with 1279
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] done with 1280
With the breakdown of number of items processed by each thread always being evenly distributed like this:
RxComputationThreadPool-3: 160
RxComputationThreadPool-4: 160
RxComputationThreadPool-5: 160
RxComputationThreadPool-7: 160
RxComputationThreadPool-2: 160
RxComputationThreadPool-6: 160
RxComputationThreadPool-1: 160
RxComputationThreadPool-8: 160
Here, notice the time gap between produced 26
and slow processing 2 done
, infact the 'single' for processing of 26 isn't created until 2 is done processing even though it's the only item being processed.
Even when maxConcurrency
is set to 4, why is processing of 1 item blocking the processing of the other items when using flatMap?
utilities used above for reference:
data class Event(val thread: Thread, val timeMillis: Long, val msg: String) {
val localTime: LocalDateTime by lazy (LazyThreadSafetyMode.NONE) {
Instant.ofEpochMilli(timeMillis).atZone(ZoneId.systemDefault()).toLocalDateTime()
}
override fun toString(): String = "${localTime} - [${thread.name}] $msg"
}
val events: ConcurrentLinkedDeque<Event> = ConcurrentLinkedDeque()
fun logEvent(msg: String) {
events.add(Event(Thread.currentThread(), System.currentTimeMillis(), msg))
}
val countsByThread: ConcurrentMap<Thread, Int> = ConcurrentHashMap()
fun process(it: Int) {
val thread = Thread.currentThread()
countsByThread.compute(thread) { _, prev -> if (prev != null) prev + 1 else 1 }
if (it == 2) {
logEvent("slow processing $it")
Thread.sleep(5_000)
logEvent("slow processing $it done")
} else {
logEvent("fast processing $it")
}
}
Rxjava2 version 2.2.21
With parallel
setup, you get a fixed number of rails that demand more items as they progress. Since only one rail is bogged down for longer, the other 3 can request and be served.
With the flatMap
setup, each item gets assigned to a Scheduler
in a round-robin fashion: item-1-scheduler-1, item-2-scheduler-2, ..., item-5-scheduler-1, item-6-scheduler-2. If an item-N bogs down a scheduler, the item-(N+4), item-(N+8), etc. will be waiting for item-N to finish. Thus, for example, when item-5-scheduler-1 finishes, the next item will be assigned to the blocked scheduler-2. After a few items, all 4 active items will be waiting for scheduler-2.