Search code examples
rxjsrx-javarx-java2

RxJava / RxJs: How to merge two source observables but complete as soon as one of them completes


I have two source observables. I would like to merge the two source observables, but the merged observable sould complete as soon as one of the source observables completes.

Desired behavior:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4--x

In case of an error on one of the sources, the error should propagate to the merged observable:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------e
"merged"  ---1---2----3--4--ex

The "merge" operator only completes the merged stream when both sources have completed:

Source 1: ---1--------3--4-----------------------------x
Source 2: -------2----------x
"merged"  ---1---2----3--4-----------------------------x

How can I achieve my desired behavior?


Solution

  • You need to work with the metadata, information about each observable. To do this, use the materialize() operator on each stream and the use dematerialize() on the merged stream to actually emit the data.

    Observable.merge( observableA.materialize(),
                      observableB.materialize() )
      .takeWhile( notification -> notification.hasValue() )
      .dematerialize()
      .subscribe( ... );
    

    This will merge the two observables until either one of them completes or emits an error.