I've tested and it's surely that emit value from the main thread cause this problem. However, I want to know if I have this use case to receive some value from the main thread to continue the Rx flow. what should be done in order to make flatMap run in a different thread than main.
class MainActivity : AppCompatActivity() {
private lateinit var emitter: ObservableEmitter<String>
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
btnFlatMap.setOnClickListener {
val obs = Observable.create<String> {
emitter = it
logThread("inside observable")
// TODO: fetch some configuration from the internet or local db
// TODO: then call startActivityForResult()
}
obs
.flatMap {
logThread("flatMap, Banana")
Observable.just("$it, 1 Item")
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ next ->
logThread("onNext")
}, { error ->
logThread("onError")
}, {
logThread("onComplete")
})
}
btnEmitter.setOnClickListener {
// TODO: simulate that onActivityResult is called
emitter.onNext("Banana")
emitter.onComplete()
}
}
private fun logThread(operation: String) {
Log.e("THREAD", "$operation run at [${Thread.currentThread().name}]")
} }
current Logcat
inside observable run at [RxCachedThreadScheduler-1]
flatMap, Banana run at [main]
onNext run at [main]
onComplete run at [main]
expected Logcat
inside observable run at [RxCachedThreadScheduler-1]
flatMap, Banana run at [RxCachedThreadScheduler-1]
onNext run at [main]
onComplete run at [main]
add one additional onserverOn(Schedulers.io())
does the trick, to tell Rx switch thread back to worker thread
obs
.observeOn(Schedulers.io()) <------ additional
.flatMap {
logThread("flatMap, Banana")
Observable.just("$it, 1 Item")
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({ next ->
logThread("onNext")
}, { error ->
logThread("onError")
}, {
logThread("onComplete")
})
because the data emitted from the main thread, it causes RX to switch the whole downstream from worker thread (Schedulers.io) which registered by subscribeOn()
while a subscription to the main thread. in order to change (switch) execution thread for the whole downstream to worker thread again observeOn()
is made for that purpose.
in short
observeOn
, change downstream to executing on a particular thread.
subscribeOn
, set the upstream (root source) to executing on a particular thread.
Now the Logcat
flatMap, Banana run at [RxCachedThreadScheduler-1]
onNext run at [main]
onComplete run at [main]