Search code examples
rx-javareactive-programmingrx-java2rx-kotlinrx-kotlin2

How to skip exceptions silently in RxJava2?


I have a data flow like this:

Observable
    .fromFuture(
        CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>>
            listOf(1, 2, 3, 57005, 5)
        },
        Schedulers.computation()
    )
    .flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one
    .map {
        CompletableFuture.supplyAsync { // This remote call may fail if it does not like the input. I want to skip that failures and continue the stream like the fail never occurred.
            if (it == 0xDEAD) {
                throw IOException("Dead value!")
            }

            it
        }
    }
    .flatMap {
        Observable.fromFuture(it) // Turn that Futures into a stream of Observables once again
    }
    .doOnNext {
        println(it) // Debug
    }
    .blockingSubscribe()

I've replaced business logic (that actually return Futures) with CompletableFuture.supplyAsync. And, yes, this is Kotlin, but I guess you got the intent.

When I comment "dead" value (57005, 0xDEAD) the output is:

1
4
9
25

But if that "dead" value appears in the stream, it fails:

1
4
9
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: Dead value!
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45)
    at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:86)
    at io.reactivex.Observable.blockingSubscribe(Observable.java:5035)
    at by.dev.madhead.rx.TestKt.main(test.kt:41)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Dead value!
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
...

I'm a newbie in RX, so quickly googled for a solution: onExceptionResumeNext: Observable.fromFuture(it) --> Observable.fromFuture(it).onExceptionResumeNext { Observable.empty<Int>() }. But now my application hangs forever (after producing the output I expect). Looks like the stream never ends.

Should I "shutdown" that Observable somehow or what? Or, more generally, is it a good approach when working with RX? Should I rethink it in another way?


Solution

  • Swallow exceptions like this:

     Observable.fromFuture(it).onErrorResumeNext(Observable.empty())