Search code examples
androidrx-javarx-java2

RxJava — emit on subscribeOn() thread


I have the following code:

Single.create { emitter ->
   // I/O thread here

   ThirdPartySDK.doSomeAction {
       // Main thread here

      emitter.onSuccess(someValue)
   }
}
.flatMap {
  someOtherSingle(it) // Executes on main thread
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({},{})

The ThirdPartySDK.doSomeAction callback posts on main thread, so the emitter will emit on the main thread too, not on the subscribe thread (and if I have some network interactions further in the flatMap, chain will fail).

If I add observeOn(Schedulers.io()) after the first Single, it switches to the correct thread, but is there any way to emit on right thread? I can't modify ThirdPartySDK behaviour.


Solution

  • subscribeOn

    The subscribeActual lambda will be invoked on given scheduler

    observeOn

    Switch thread to given scheduler. Every upstream-onNext call will be called from an ObserveOn-Scheduler-Thread

    As you already said, subscribeOn will only invoke the subscribeActual method call on subscribe on given Scheduler-Thread. This does not mean, that the downstream emit will be on the same thread. In your case the onSuccess emit will be called from a different thread (e.g. Database/ Http-ThreadPool etc.).

    onSuccess will be called from a unknown thread (in your case main thread). The downstream call will be called from the main-thread. Therefore flatMap is called from the main-thread. Network-calls on the main-thread in the flatMap will probably fail, because it is not allowed to "block" the main-thread.

    How to solve this issue? Just place a observeOn after the Single#create. The main-thread calls onSucess. The observeOn-subscriber will get called from the main-thread. The observeOn-subscriber re-directs onSuccess downstream-call (e.g. flatMap) to given ObserveOn-Scheduler-Thread. Therefore it is given, that flatMap is called from a non main-loop thread.

    Example:

    @Test
    fun wurst() {
        val thirdPartySDKImpl = ThirdPartySDKImpl()
        Single.create<String> { emitter ->
            thirdPartySDKImpl.doSomeAction {
                emitter.onSuccess(it)
            }
        }
            // .subscribeOn(Schedulers.computation())
            // move emit from unknown thread to computation thread
            .observeOn(Schedulers.computation())
            // Single.just will be subscribe from a computation thread
            .flatMap { Single.just(123) }
            // move onSucess/ onError emit from computation thread to main-thread
            .observeOn(AndroidSchedulers.mainThread())
            // subscribe onNext / onError will be called from the main-android-thread
            .subscribe({}, {})
    }
    
    interface ThirdPartySDK {
        fun doSomeAction(callback: (v: String) -> Unit)
    }
    
    class ThirdPartySDKImpl : ThirdPartySDK {
        override fun doSomeAction(callback: (v: String) -> Unit) {
            // <- impl-detail ->
            callback("whatever")
        }
    
    }
    

    NOTE: You do not need a subscribeOn, if the create-lambda does not block or does some cpu heavy stuff. If it only subscribes to a callback, which will be called from a different thread, you do not need subscribeOn.

    but is there any way to emit on right thread?

    You should not use any concurrency in operators. You would think, you could just do something like:

        Single.create<String> { emitter ->
            thirdPartySDKImpl.doSomeAction {
                Schedulers.io().scheduleDirect {
                    emitter.onSuccess(it)
                }
            }
        }
    

    But this is not recommended, because you could break the serialized onNext contract^1. This example would make sure, that the onSucess downstream call would happen on expected thread, but cancellation/ unsubscription is not handled and there might be other pitfalls.

    If you have a non reactive API and you want to enforce some threading-model I would suggest to wrap the sync. API with an async one and provide proper observeOn/ subscribeOn operators. Later on only use the async API.

    interface ThirdPartySDKAsync {
        fun doSomeAction(): Single<String>
    }
    
    class ThirdPartySDKAsyncImpl(private val sdk: ThirdPartySDK, private val scheduler: Scheduler) :
        ThirdPartySDKAsync {
        override fun doSomeAction(): Single<String> {
            return Single.create<String> { emitter ->
                sdk.doSomeAction {
                    emitter.onSuccess(it)
                }
            }.observeOn(scheduler)
        }
    }
    

    Further reading: https://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

    ^1 Only one thread a time is allowed to call onNext/onSuccess/onError/onComplete