Search code examples
javaobservablerx-java2take

RxJava2 observable take throws UndeliverableException


As I understand RxJava2 values.take(1) creates another Observable that contains only one element from the original Observable. Which MUST NOT throw an exception as it is filtered out by the effect of take(1) as it's happened second.

as in the following code snippet

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

Output

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

My questions :

  1. Am I understanding it correct ?
  2. What's really happening to cause the exception.
  3. How to solve this from the consumer ?

Solution

    1. Yes, but because the observable 'ends' does not mean the code running inside create(...) is stopped. To be fully safe in this case you need to use o.isDisposed() to see if the observable has ended downstream.
    2. The exception is there because RxJava 2 has the policy of NEVER allowing an onError call to be lost. It is either delivered downstream or thrown as a global UndeliverableException if the observable has already terminated. It is up to the creator of the Observable to 'properly' handle the case where the observable has ended and an Exception occurs.
    3. The problem is the producer (Observable) and the consumer (Subscriber) disagreeing on when the stream ends. Since the producer is outliving the consumer in this case, the problem can only be fixed in the producer.