Search code examples
androiderror-handlingrx-javaretrofit2rx-android

RxJava : How to handle error with zip operator ?


I am using RxJava and RxAndroid with Retrofit2.

Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

Using zip operator as below on above two Observer.

 Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>() {
            @Override
                public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo) {
                  ArrayList<TestData> testDataList = new ArrayList();
                      // Add test data from response responseOne & responseTwo
                  return testDataList;
            } 
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<ArrayList<TestData>>() {

        @Override
        public void onNext(ArrayList<TestData> testDataList) {

        }

        @Override
        public void onCompleted() {
            Log.d(TAG, "onCompleted" );
        }

        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "onError Throwable: " + t.toString() );
        }
    });

If there is any error occurs during retrofit http call in responseOneObservable and responseTwoObservable then it will directly call onError method of subscriber of testDataObservable.

I want to continue in zip operator's call method even if anyone of two observable got success response.

How to handle error response using zip operator ?


Solution

  • You can use onErrorResumeNext to return some Observable or onErrorReturn to return some default value to zip, like:

    Observable.zip(
       responseOneObservable
           .onErrorReturn(new Func1<Throwable, ResponseOne>() {
            @Override
            public ResponseOne call(final Throwable throwable) {
                return new ResponseOne();
            }
        }),
       responseTwoObservable
           .onErrorReturn(new Func1<Throwable, ResponseTwo>() {
            @Override
            public ResponseTwo call(final Throwable throwable) {
                return new ResponseTwo();
            }
        }),
       ...
    

    See onError handling for more info.


    UPDATE: With RxJava 2.0 you must use Function instead of Func1:

    import io.reactivex.functions.Function;
    ...
    Observable.zip(
       responseOneObservable
           .onErrorReturn(new Function<Throwable, ResponseOne>() {
            @Override
            public ResponseOne apply(@NonNull final Throwable throwable) {
                return new ResponseOne();
            }
        }),
       responseTwoObservable
           .onErrorReturn(new Function<Throwable, ResponseTwo>() {
            @Override
            public ResponseTwo apply(@NonNull final Throwable throwable) {
                return new ResponseTwo();
            }
        }),
       ...
    

    Or using lambdas:

    Observable.zip(
       responseOneObservable
           .onErrorReturn(throwable -> new ResponseOne()),
       responseTwoObservable
           .onErrorReturn(throwable -> new ResponseTwo()),
       ...
    

    Or using Kotlin:

    Observable.zip(
       responseOneObservable
           .onErrorReturn { ResponseOne() },
       responseTwoObservable
           .onErrorReturn { ResponseTwo() },
       ...