Search code examples
androidrx-java2rx-android

FlatMap run on main thread even using Scheuler.io


I've tested and it's surely that emit value from the main thread cause this problem. However, I want to know if I have this use case to receive some value from the main thread to continue the Rx flow. what should be done in order to make flatMap run in a different thread than main.

class MainActivity : AppCompatActivity() {

    private lateinit var emitter: ObservableEmitter<String>

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        btnFlatMap.setOnClickListener {

            val obs = Observable.create<String> {
                emitter = it
                logThread("inside observable")

                // TODO: fetch some configuration from the internet or local db

                // TODO: then call startActivityForResult()
            }

            obs
                .flatMap {
                    logThread("flatMap, Banana")
                    Observable.just("$it, 1 Item")
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ next ->
                    logThread("onNext")
                }, { error ->
                    logThread("onError")
                }, {
                    logThread("onComplete")
                })
        }

        btnEmitter.setOnClickListener {
            // TODO: simulate that onActivityResult is called
            emitter.onNext("Banana")
            emitter.onComplete()
        }
    }

    private fun logThread(operation: String) {
        Log.e("THREAD", "$operation run at [${Thread.currentThread().name}]")
    } }

current Logcat

inside observable run at [RxCachedThreadScheduler-1]

flatMap, Banana run at [main]

onNext run at [main]

onComplete run at [main]

expected Logcat

inside observable run at [RxCachedThreadScheduler-1]

flatMap, Banana run at [RxCachedThreadScheduler-1]

onNext run at [main]

onComplete run at [main]


Solution

  • add one additional onserverOn(Schedulers.io()) does the trick, to tell Rx switch thread back to worker thread

            obs
                .observeOn(Schedulers.io()) <------ additional 
                .flatMap {
                    logThread("flatMap, Banana")
                    Observable.just("$it, 1 Item")
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ next ->
                    logThread("onNext")
                }, { error ->
                    logThread("onError")
                }, {
                    logThread("onComplete")
                })
    

    because the data emitted from the main thread, it causes RX to switch the whole downstream from worker thread (Schedulers.io) which registered by subscribeOn() while a subscription to the main thread. in order to change (switch) execution thread for the whole downstream to worker thread again observeOn() is made for that purpose.

    in short

    • observeOn, change downstream to executing on a particular thread.

    • subscribeOn, set the upstream (root source) to executing on a particular thread.

    Now the Logcat

    flatMap, Banana run at [RxCachedThreadScheduler-1]

    onNext run at [main]

    onComplete run at [main]