Search code examples
javaandroidrx-javarx-android

How to set a timeout for multiple observables


I have a scenario where I have two observables, each one returns an item from an API call.

What I want to do is:

  • Execute both api calls concurrently.
  • Set a timeout value (500 milliseconds) for both methods to finish.
  • Receive the result from both after they finish, if both took less than the timeout value (500 milliseconds) I'll receive the results from both, if one only finishes then I want to receive its result and receive an error or a null value of the second.
  • If both didn't finish within the timeout, I want to receive an error.

I Appreciate any hints on how to achieve that.


Solution

  • Assuming both your api return an Observable which has emissions of type T, the first step is modify these observables so that they timeout. You can do that using the amb and Timer function.

    Observable<T> apiCall1WithTimeout = Observable.amb(apiCall1(), 
                                                       Observable.Timer(500, TimeUnit.MILLISECONDS)
                                                                 .flatmap(l -> Observable.error(new Exception("Api1 timed out")));
    Observable<T> apiCall2WithTimeout = Observable.amb(apiCall2(), 
                                                       Observable.Timer(500, TimeUnit.MILLISECONDS)
                                                                 .flatmap(l -> Observable.error(new Exception("Api2 timed out"))));
    

    The second step is to further modify your observables so they do their work on different threads. This is done using the subscribeOn operator

    Observable<T> apiCall1WithTimeoutNewThread = apiCall1WithTimeout.subscribeOn(Schedulers.newThread());
    Observable<T> apiCall2WithTimeoutNewThread = apiCall2WithTimeout.subscribeOn(Schedulers.newThread());
    

    The final step is to merge the emissions from your observables, but you want any timeout errors to be emitted last, luckily the mergeDelayError operator does just that so

    Observable<T> mergedCalls = Observable.mergeDelayError(apiCall1WithTimeoutNewThread, apiCall2WithTimeoutNewThread);