Search code examples
androidrx-javaretrofit2reactivex

Conditional chain of observables


I want to asynchronously retrieve data via multiple REST APIs. I'm using Retrofit on Android with the rxJava extension, i.e. I execute any GET request by subscribing to an Observable.

As I said, I have multiple source APIs, so when the first source does not yield the desired result I want to try the next on, if that also fails, again try the next and so forth, until all sources have been queried or a result was found.

I'm struggling to translate this approach into proper use of Observables since I don't know which operators can achieve this behaviour and there are also some constraints to honor:

  • when a result has been found, the remaining APIs, if any, should not be queried
  • other components depend on the result of the query, I want them to get an Observable when starting the request, so this Observable can notify them of the completion of the request
  • I need to keep a reference to aforementioned Observable because the same request could possibly be made more than once before it has finished, in that case I only start it the first time it is wanted and subsequent requests only get the Observable which notifies when the request finished

I was starting out with only one API to query and used the following for the request and subsequent notification of dependent components:

private Observable<String> loadData(int jobId) {

    final ConnectableObservable<String> result = Async
            .fromCallable(() -> getResult(jobId))
            .publish();

    getRestRequest()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    dataHolder -> {
                        if (dataHolder.getData() != null && !dataHolder.getData().isEmpty()) {
                            saveData(dataHolder.getData());
                        } else {
                            markNotFound(dataHolder);
                        }
                    },
                    error -> currentJobs.remove(jobId),
                    () -> {
                        currentJobs.remove(jobId);
                        result.connect();
                    });

    return result;
}

This code was only called for the first request, the returned Observable result would then be saved in currentJobs and subsequent requests would only get the Observable without triggering the request again.

Any help is highly appreciated.


Solution

  • Assuming you have a set of observables that re-connect each time you subscribe:

    List<Observable<Result>> suppliers = ...
    

    Then you just need to do the logical thing:

    Observable<Result> results = Observable
              .from(suppliers)
              .concatMap(supplier -> supplier)
              .takeFirst(result -> isAcceptable(result))
              .cache()