Search code examples
javareactive-programmingrx-java2

Use onErrorReturn with retryWhen in RxJava


Here is the code:

import io.reactivex.Observable;
import io.reactivex.Observer;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;

public class RxJavaTest {

    @Test
    public void onErr() {

        Observable<String> values1 = new Observable<String>() {
            @Override
            protected void subscribeActual(Observer<? super String> observer) {
                observer.onNext("New");
                observer.onNext("New1");
                observer.onNext("New2");
                observer.onNext("New3");
                observer.onNext("New4");
                if (ThreadLocalRandom
                            .current()
                            .nextInt(10) == 5) {
                    observer.onError(new Exception("don't retry..."));
                } else {
                    observer.onError(new IllegalArgumentException("retry..."));
                }
            }
        };
        AtomicBoolean finished = new AtomicBoolean(false);
        values1
                .retryWhen(throwableObservable -> throwableObservable
                        .takeWhile(throwable -> {
                            boolean result = (throwable instanceof IllegalArgumentException);
                            if (result) {
                                System.out.println("Retry on error: " + throwable);
                                return result;
                            }
                            System.out.println("Error: " + throwable);
                            return result;
                        })
                        .take(20))
                .onErrorReturn(throwable -> "Saved the day!")
                .doOnTerminate(() -> finished.set(true))
                .subscribe(v -> System.out.println(v));
    }
}

The goal is to

  • retry only when there is an IllegalArgumentException,
  • for any other exception, return immediately (by onErrorReturn).

The code above accomplishes the first goal, but failed at the second, it stops retrying, but ignores the .onErrorReturn part.

Any idea how to make it work?


Solution

  • You can make it work changing your retryWhen to:

                    .retryWhen(throwableObservable ->
                                    throwableObservable.flatMap(throwable -> {
                                        if (throwable instanceof IllegalArgumentException) {
                                            System.out.println("Retry on error: " + throwable);
                                            return Observable.just(1);
                                        } else {
                                            System.out.println("Error: " + throwable);
                                            return Observable.<Integer>error(throwable);
                                        }
                                    })
                    )
    

    In order to make it retry, it doesn't matter which value you return in the retryWhen (in the example above it's returning 1). As per javadoc:

    If that ObservableSource calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.