Search code examples
rx-javarx-java2

Concurrency behavior difference between flatMap and parallel with variable processing times


I am seeing different behaviors in the following two pipelines where I would expect similar/same. Intent is to processes the items with concurrency level of x (4 in the examples), and not block any items from processing in the 'buffer'.

I have recreated the scenario here to play with using a Flowable.range(1, 1280) as the source, and simulating some 'slow processing' on item 2 only which will block for 5 seconds.

Flowable.range(1, 1280) // cold flowable, items are produced 'on-demand'
    .doOnNext { logEvent("produced $it") }
    .parallel(4, 1) //parallism is 4, prefetch is 1
    .runOn(Schedulers.computation(), 1) //again use prefetch of 1
    .doOnNext(::process)
    .sequential()
    .doOnNext { logEvent("done with $it") }
    .ignoreElements()
    .blockingAwait()

For this I get output like:

...
2021-04-12T12:15:49.147 - [main] produced 4
2021-04-12T12:15:49.147 - [main] produced 5
2021-04-12T12:15:49.147 - [RxComputationThreadPool-2] slow processing 2
2021-04-12T12:15:49.147 - [RxComputationThreadPool-4] fast processing 4
...
2021-04-12T12:15:49.170 - [RxComputationThreadPool-1] fast processing 1278
2021-04-12T12:15:49.170 - [RxComputationThreadPool-1] done with 1278
2021-04-12T12:15:54.147 - [RxComputationThreadPool-2] slow processing 2 done
2021-04-12T12:15:54.147 - [RxComputationThreadPool-2] done with 2

The breakdown of number of items processed by each thread in this case is this:

RxComputationThreadPool-2: 1
RxComputationThreadPool-4: 429
RxComputationThreadPool-3: 416
RxComputationThreadPool-1: 434

Also note the time in the logs, all items except for 2 are processed within 1 second, and item 2 finishes after 5 seconds as expected.

Now I expect to achieve similar behavior with this flatMap approach:

Flowable.range(1, 1280)
    .doOnNext { logEvent("produced $it") }
    .flatMapSingle({ Single.fromCallable { process(it); it }.subscribeOn(Schedulers.computation()) },
                   true, 4) // delayErrors (true or false doesn't matter), and maxConcurrency
    .doOnNext { logEvent("done with $it") }
    .ignoreElements()
    .blockingAwait()

However I get output like this:

...
2021-04-12T12:29:24.452 - [main] produced 4
2021-04-12T12:29:24.454 - [RxComputationThreadPool-1] fast processing 1
2021-04-12T12:29:24.454 - [RxComputationThreadPool-3] fast processing 3
2021-04-12T12:29:24.455 - [RxComputationThreadPool-1] done with 1
2021-04-12T12:29:24.455 - [RxComputationThreadPool-2] slow processing 2
2021-04-12T12:29:24.455 - [RxComputationThreadPool-1] produced 5
...
2021-04-12T12:29:24.458 - [RxComputationThreadPool-8] produced 25
2021-04-12T12:29:24.459 - [RxComputationThreadPool-1] fast processing 25
2021-04-12T12:29:24.459 - [RxComputationThreadPool-1] done with 25
2021-04-12T12:29:24.459 - [RxComputationThreadPool-1] produced 26
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] slow processing 2 done
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] done with 2
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] produced 27
2021-04-12T12:29:29.455 - [RxComputationThreadPool-2] fast processing 10
...
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] produced 1280
2021-04-12T12:29:29.477 - [RxComputationThreadPool-7] fast processing 1279
2021-04-12T12:29:29.477 - [RxComputationThreadPool-8] fast processing 1280
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] done with 1278
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] done with 1279
2021-04-12T12:29:29.477 - [RxComputationThreadPool-1] done with 1280

With the breakdown of number of items processed by each thread always being evenly distributed like this:

RxComputationThreadPool-3: 160
RxComputationThreadPool-4: 160
RxComputationThreadPool-5: 160
RxComputationThreadPool-7: 160
RxComputationThreadPool-2: 160
RxComputationThreadPool-6: 160
RxComputationThreadPool-1: 160
RxComputationThreadPool-8: 160

Here, notice the time gap between produced 26 and slow processing 2 done, infact the 'single' for processing of 26 isn't created until 2 is done processing even though it's the only item being processed.

Even when maxConcurrency is set to 4, why is processing of 1 item blocking the processing of the other items when using flatMap?

utilities used above for reference:

        data class Event(val thread: Thread, val timeMillis: Long, val msg: String) {
            val localTime: LocalDateTime by lazy (LazyThreadSafetyMode.NONE) {
                Instant.ofEpochMilli(timeMillis).atZone(ZoneId.systemDefault()).toLocalDateTime()
            }
            override fun toString(): String = "${localTime} - [${thread.name}] $msg"
        }
        val events: ConcurrentLinkedDeque<Event> = ConcurrentLinkedDeque()
        fun logEvent(msg: String) {
            events.add(Event(Thread.currentThread(), System.currentTimeMillis(), msg))
        }
        val countsByThread: ConcurrentMap<Thread, Int> = ConcurrentHashMap()
        fun process(it: Int) {
            val thread = Thread.currentThread()
            countsByThread.compute(thread) { _, prev -> if (prev != null) prev + 1 else 1 }
            if (it == 2) {
                logEvent("slow processing $it")
                Thread.sleep(5_000)
                logEvent("slow processing $it done")
            } else {
                logEvent("fast processing $it")
            }
        }

Rxjava2 version 2.2.21


Solution

  • With parallel setup, you get a fixed number of rails that demand more items as they progress. Since only one rail is bogged down for longer, the other 3 can request and be served.

    With the flatMap setup, each item gets assigned to a Scheduler in a round-robin fashion: item-1-scheduler-1, item-2-scheduler-2, ..., item-5-scheduler-1, item-6-scheduler-2. If an item-N bogs down a scheduler, the item-(N+4), item-(N+8), etc. will be waiting for item-N to finish. Thus, for example, when item-5-scheduler-1 finishes, the next item will be assigned to the blocked scheduler-2. After a few items, all 4 active items will be waiting for scheduler-2.