Given an Observable<Input>
and a mapping function Function<Input, Output>
that is expensive but takes variable time, is there a way to call the mapping function in parallel on multiple inputs, and receive the outputs in the order they're produced?
I've tried using observeOn()
with a multi-threaded Scheduler
:
PublishSubject<Input> inputs = PublishSubject.create();
Function<Input, Output> mf = ...
Observer<Output> myObserver = ...
// Note: same results with newFixedThreadPool(2)
Executor exec = Executors.newWorkStealingThreadPool();
// Use ConnectableObservable to make sure mf is called only once
// no matter how many downstream observers
ConnectableObservable<Output> outputs = inputs
.observeOn(SchedulersFrom(exec))
.map(mf)
.publish();
outputs.subscribe(myObserver1);
outputs.subscribe(myObserver2);
outputs.connect();
inputs.onNext(slowInput); // `mf.apply()` takes a long time to complete on this input
inputs.onNext(fastInput); // `mf.apply()` takes a short time to complete on this input
but in testing, mf.apply(fastInput)
is never called till after mf.apply(slowInput)
completes.
If I play some tricks in my test with CountDownLatch
to ensure mf.apply(slowInput)
can't complete until after mf.apply(fastInput)
, the program deadlocks.
Is there some simple operator I should be using here, or is getting Observables
out of order just against the grain of RxJava, and I should be using a different technology?
ETA: I looked at using ParallelFlowable
(converting it back to a plain Flowable
with .sequential()
before subscribing myObserver1/2
, or rather mySubscriber1/2
), but then I get extra mf.apply()
calls, one per input per Subscriber
. There's ConnectableFlowable
, but I'm not having much luck figuring out how to mix it with .parallel()
.
I guess observeOn
operator does not support concurrent execution for alone. So, how about using flatMap? Assume the mf
function needs a lot time.
ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();
or
ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.map(mf))
.subscribeOn(SchedulersFrom(exec))
.publish();
Edit 2019-12-30
If you want to run tasks concurrently, but supposed to keep the order, use concatMapEager
operator instead of flatMap
.
ConnectableObservable<Output> outputs = inputs
.concatMapEager(it -> Observable.just(it) // here
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();