I'm trying to get my bearings around some details of RxJava concurrency and I'm not sure if what I have in mind is correct. I have a good understanding of how SubscribeOn/ObserveOn work, but I'm trying to nail down some particulars of the pool schedulers. For that, I'm looking at implementing a 1-N producer-consumer chain with as many consumers as CPUs as simply as possible.
According to docs, Schedulers.computation() is backed by a pool of as many threads as cores. However, per the Reactive contract, an operator gets only sequential calls.
Hence, a setup like this one
Observable.range(1, 1000) // Whatever has to be processed
.observeOn(Schedulers.computation())
.doOnNext(/* heavy computation */)
.doOnCompleted(() -> System.out.println("COMPLETED"))
.forEach(System.out::println);
despite using a thread pool will only receive a concurrent call to doOnNext. Experiments with sleep an inspecting OperatorObserveOn.java
seem to confirm this, since a worker is obtained per observeOn
call. Also, if it were otherwise there should be a complicated management of OnCompleted having to wait for any pending OnNext to complete, which I don't find exists.
Supposing I'm on the right track here (that is, that only a thread is involved, although you can jump around several of them with observeOn), what would be then the proper pattern? I can find examples for the converse situation (synchronize several async event generators into one consumer) but not a simple example for this typical case.
I guess flatMap is involved, perhaps using the beta version (in 1.x) that limits the number of concurrent subscriptions. Could perhaps be as simple as using window/flatMap like this?
Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example
.flatMap(/* Processing */, 4) // For 4-concurrent processing
.subscribe()
In this approach I'm still missing an easy way of maxing the CPU in a Rx-generic way (that is, specifying the computation Scheduler instead of a maximum of subscriptions to flatMap). So, perhaps...:
Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example
.flatMap(v -> Observable.just(v)
.observeOn(Schedulers.computation())
.map(/* heavy parallel computation */))
.subscribe()
Lastly, in some examples with flatMap I see a toBlock()
call after flatMap
which I'm not sure why is needed, since shouldn't flatMap be performing the serialization for downstream? (E.g. in this example: http://akarnokd.blogspot.com.es/2016/02/flatmap-part-1.html)
There's a good article by Thomas Nield on exactly that case
RxJava - Maximizing Parallelization
What I personally do in that case, I just subscribe with Schedulers.io
in a flatMap
with a maximum concurrent calls parameter.
Observable.range(1, 1000) // Whatever has to be processed
.flatMap(v -> Observable.fromCallable(() -> { /* io calls */})
.subscribeOn(Schedulers.io()), Runtime.getRuntime().availableProcessors() + 1)
.subscribe();
EDIT
as per suggestion in comments it's better to use Schedulers.computation()
for CPU bound work
Observable.range(1, 1000) // Whatever has to be processed
.flatMap(v -> Observable.fromCallable(() -> { /* intense calculation */})
.subscribeOn(Schedulers.computation()))
.subscribe();