Search code examples
concurrencyparallel-processingreactive-programmingrx-java2

Parallelize map() operation on single Observable and receive results out of order


Given an Observable<Input> and a mapping function Function<Input, Output> that is expensive but takes variable time, is there a way to call the mapping function in parallel on multiple inputs, and receive the outputs in the order they're produced?

I've tried using observeOn() with a multi-threaded Scheduler:

PublishSubject<Input> inputs = PublishSubject.create();
Function<Input, Output> mf = ...
Observer<Output> myObserver = ...

// Note: same results with newFixedThreadPool(2)
Executor exec = Executors.newWorkStealingThreadPool();

// Use ConnectableObservable to make sure mf is called only once
// no matter how many downstream observers
ConnectableObservable<Output> outputs = inputs
    .observeOn(SchedulersFrom(exec))
    .map(mf)
    .publish();
outputs.subscribe(myObserver1);
outputs.subscribe(myObserver2);
outputs.connect();

inputs.onNext(slowInput); // `mf.apply()` takes a long time to complete on this input
inputs.onNext(fastInput); // `mf.apply()` takes a short time to complete on this input

but in testing, mf.apply(fastInput) is never called till after mf.apply(slowInput) completes.

If I play some tricks in my test with CountDownLatch to ensure mf.apply(slowInput) can't complete until after mf.apply(fastInput), the program deadlocks.

Is there some simple operator I should be using here, or is getting Observables out of order just against the grain of RxJava, and I should be using a different technology?


ETA: I looked at using ParallelFlowable (converting it back to a plain Flowable with .sequential() before subscribing myObserver1/2, or rather mySubscriber1/2), but then I get extra mf.apply() calls, one per input per Subscriber. There's ConnectableFlowable, but I'm not having much luck figuring out how to mix it with .parallel().


Solution

  • I guess observeOn operator does not support concurrent execution for alone. So, how about using flatMap? Assume the mf function needs a lot time.

        ConnectableObservable<Output> outputs = inputs
            .flatMap(it -> Observable.just(it)
                .observeOn(SchedulersFrom(exec))
                .map(mf))
            .publish();
    

    or

        ConnectableObservable<Output> outputs = inputs
            .flatMap(it -> Observable.just(it)
                .map(mf))
                .subscribeOn(SchedulersFrom(exec))
            .publish();
    

    Edit 2019-12-30

    If you want to run tasks concurrently, but supposed to keep the order, use concatMapEager operator instead of flatMap.

        ConnectableObservable<Output> outputs = inputs
            .concatMapEager(it -> Observable.just(it) // here
                .observeOn(SchedulersFrom(exec))
                .map(mf))
            .publish();