Search code examples
javaobservablereactive-programmingrx-java2

RxJava2 - last successful Single from concat


I have N Single sources and I want to concat these and get the latest successful Single. For ex:

DB Single:

--[A]--|->

API Single:

---X--->

Single.concat(DB, API):

--[A]--|->

Otherwise:

DB Single:

--[A]--|->

API Single:

--[B]--|->

Single.concat(DB, API):

--[B]--|->

Is that possible? I read the documentation but I didn't find anything like "lastSuccessfullOrError()" method. I tried "elementAt","lastOrError" and others but their behavior is not what I'm looking for

Thank you


Solution

  • This can be achieved as follows:

    public static <T> Single<T> latestSuccess(Single<T>... sources) {
         return Single.defer(() -> {
             AtomicReference<T> last = new AtomicReference<T>();
             return Observable.fromArray(sources)
                 .concatMap(source ->
                      source.doOnSuccess(last::lazySet)
                      .toObservable()
                      .onErrorResumeNext(Observable.empty())
                 )
                 .ignoreElements()
                 .andThen(Single.fromCallable(() -> {
                     if (last.get() == null) {
                         throw new NoSuchElementException();
                     }
                     return last.get();
                 }));
         });
    }