Search code examples
kotlinexceptionbroadcastreceiverrx-java2

RxJava2 Flowable onErrorReturn not called


Say I need to wrap BroadcastReceiver with Flowable:

        Flowable
                .create<Boolean>({ emitter ->
                    val broadcastReceiver = object : BroadcastReceiver() {
                        override fun onReceive(context: Context?, intent: Intent?) {
                            throw RuntimeException("Test exception")
                        }
                    }
                    application.registerReceiver(broadcastReceiver, IntentFilter(LocationManager.PROVIDERS_CHANGED_ACTION))
                }, BackpressureStrategy.MISSING)
                .onErrorReturn { false }

Then I need to catch any exceptions thrown inside Flowable in one single place.

I supposed onErrorReturn should be able to catch that throw RuntimeException("Test exception") inside broadcastReceiver but it doesn't catch that exception and app crashes.

Certainly, I can wrap anything inside BroadcastReceiver with try/catch. But actually, I have a lot of source code there so that adding try/catch makes source code quite messy.

Is there any way to catch all the exceptions thrown in any line inside Flowable in one single place?


Solution

  • In case of Flowable#create() to follow contract of Flowable if you have error and want to pass it through stream, you need to catch it and call emitter.onError(). If you do that, Flowable.onErrorReturn() starts work as expected.

    To properly register/unregister BroadcastReceiver and handle exceptions you can use that approach

    Flowable
            .create<Boolean>({ emitter ->
                val broadcastReceiver = object : BroadcastReceiver() {
                    override fun onReceive(context: Context?, intent: Intent?) {
                        try {
                            throw RuntimeException("Test exception")
                        } catch(e: Throwable) {
                            emitter.tryOnError(e)
                        }
                    }
                }
                try {
                    application.registerReceiver(broadcastReceiver, IntentFilter(LocationManager.PROVIDERS_CHANGED_ACTION))
    
                    emitter.setCancellable {
                        application.unregisterReceiver(broadcastReceiver)
                    }
                } catch (e: Throwable) {
                    emitter.tryOnError(e)
                }
            }, BackpressureStrategy.MISSING)
            .onErrorReturn { false }