Inspired by .Net TPL, I'm trying to find a way to handle an error outside the RX pipe. Specifically, on error, I wish the Observer pipe to stop, and pass control back to the surrounding method. Something like:
public void testRxJava() {
try {
Observable.range(0, 5)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(i -> { throw new RuntimeException(); })
.subscribe();
} catch (Exception ex) {
// I was hoping to get here on the main thread, but crashed instead
Log.i("Test", "Will never get here");
}
}
This will cause the application to crash with an io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException
, which will not be caught in the catch
clause, and will instead invoke the main thread's uncaughtException()
handler.
Trying to throw from the error handler in subscribe()
does not work either, again falling back to the uncaughtException()
handler.
Is there a way to re-throw or otherwise pass the error information to the calling method?
A similar question for C# is found here.
Here's what I ended up doing. As far as I can tell, this is the only way to leave the ReactiveX pipe, and let the surrounding code handle the error. Would be happy if someone has a more elegant way:
public void testRxJava() {
try {
// will be null if no error, will hold a Throwable on error
AtomicReference<Throwable> opError = new AtomicReference<>(null);
Observable.range(0, 5)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.map(i -> { throw new RuntimeException(); }) // throws
.blockingSubscribe(
result -> Log.i(TAG, "will never happen"),
error -> { opError.set(error); } // sets reference to the error without crashing the app
);
// re-throw
if(opError.get() != null) {
throw new Exception(opError.get());
}
} catch (Exception ex) {
Log.e("Test", "exception", ex);
}
}