I have a scenario where I have two observables, each one returns an item from an API call.
What I want to do is:
I Appreciate any hints on how to achieve that.
Assuming both your api return an Observable which has emissions of type T, the first step is modify these observables so that they timeout.
You can do that using the amb
and Timer
function.
Observable<T> apiCall1WithTimeout = Observable.amb(apiCall1(),
Observable.Timer(500, TimeUnit.MILLISECONDS)
.flatmap(l -> Observable.error(new Exception("Api1 timed out")));
Observable<T> apiCall2WithTimeout = Observable.amb(apiCall2(),
Observable.Timer(500, TimeUnit.MILLISECONDS)
.flatmap(l -> Observable.error(new Exception("Api2 timed out"))));
The second step is to further modify your observables so they do their work on different threads. This is done using the subscribeOn
operator
Observable<T> apiCall1WithTimeoutNewThread = apiCall1WithTimeout.subscribeOn(Schedulers.newThread());
Observable<T> apiCall2WithTimeoutNewThread = apiCall2WithTimeout.subscribeOn(Schedulers.newThread());
The final step is to merge the emissions from your observables, but you want any timeout errors to be emitted last, luckily the mergeDelayError
operator does just that so
Observable<T> mergedCalls = Observable.mergeDelayError(apiCall1WithTimeoutNewThread, apiCall2WithTimeoutNewThread);