Search code examples
concurrencyrx-javaproducer-consumer

RxJava - One producer, many concurrent consumers in single subscription


I'm trying to get my bearings around some details of RxJava concurrency and I'm not sure if what I have in mind is correct. I have a good understanding of how SubscribeOn/ObserveOn work, but I'm trying to nail down some particulars of the pool schedulers. For that, I'm looking at implementing a 1-N producer-consumer chain with as many consumers as CPUs as simply as possible.

According to docs, Schedulers.computation() is backed by a pool of as many threads as cores. However, per the Reactive contract, an operator gets only sequential calls.

Hence, a setup like this one

Observable.range(1, 1000) // Whatever has to be processed
            .observeOn(Schedulers.computation())
            .doOnNext(/* heavy computation */)
            .doOnCompleted(() -> System.out.println("COMPLETED"))
            .forEach(System.out::println);

despite using a thread pool will only receive a concurrent call to doOnNext. Experiments with sleep an inspecting OperatorObserveOn.java seem to confirm this, since a worker is obtained per observeOn call. Also, if it were otherwise there should be a complicated management of OnCompleted having to wait for any pending OnNext to complete, which I don't find exists.

Supposing I'm on the right track here (that is, that only a thread is involved, although you can jump around several of them with observeOn), what would be then the proper pattern? I can find examples for the converse situation (synchronize several async event generators into one consumer) but not a simple example for this typical case.

I guess flatMap is involved, perhaps using the beta version (in 1.x) that limits the number of concurrent subscriptions. Could perhaps be as simple as using window/flatMap like this?

Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example 
.flatMap(/* Processing */, 4) // For 4-concurrent processing
.subscribe()

In this approach I'm still missing an easy way of maxing the CPU in a Rx-generic way (that is, specifying the computation Scheduler instead of a maximum of subscriptions to flatMap). So, perhaps...:

Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example 
.flatMap(v -> Observable.just(v)
                        .observeOn(Schedulers.computation())
                        .map(/* heavy parallel computation */))
.subscribe()

Lastly, in some examples with flatMap I see a toBlock() call after flatMap which I'm not sure why is needed, since shouldn't flatMap be performing the serialization for downstream? (E.g. in this example: http://akarnokd.blogspot.com.es/2016/02/flatmap-part-1.html)


Solution

  • There's a good article by Thomas Nield on exactly that case

    RxJava - Maximizing Parallelization

    What I personally do in that case, I just subscribe with Schedulers.io in a flatMap with a maximum concurrent calls parameter.

        Observable.range(1, 1000) // Whatever has to be processed
                .flatMap(v -> Observable.fromCallable(() -> { /* io calls */}) 
                        .subscribeOn(Schedulers.io()), Runtime.getRuntime().availableProcessors() + 1)
                .subscribe();
    

    EDIT as per suggestion in comments it's better to use Schedulers.computation() for CPU bound work

        Observable.range(1, 1000) // Whatever has to be processed
                .flatMap(v -> Observable.fromCallable(() -> { /* intense calculation */}) 
                        .subscribeOn(Schedulers.computation()))
                .subscribe();