I am experimenting with RxJava on Android at the moment and encountered a problem trying to use the AmbWith Operator with two Observables.
As I understood in the definition of amb here http://reactivex.io/documentation/operators/amb.html
only the first Observable to send a notification is taken, the rest ignored and discarded.
I connected two Observables, one does a network call(with retrofit) the other one responds with data from the memory.
The code for this looks like this:
retrofitService.loadInfo()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
//the observable here returns an item from in memory and should be observed on the ui thread
.ambWith(observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()))
.subscribe(action);
The problem now is, if the second observable emits an item first(which is very likely) the first observable gets cancelled. This results in an NetworkOnMainThread if the webservice was already started.
It works fine if I remove the
.observeOn(AndroidSchedulers.mainThread()
for the second Observable, but I need the result in the ui.
My guess is, that the amb operator is executed on the main thread and somehow the cancel operation ends up in the main thread.
Did I miss something there? Does anybody know how to get around this?
Any help is appreciated.
Your observeOn affects only for original Observables, so, i think, this may help:
retrofitService.loadInfo()
.subscribeOn(Schedulers.newThread())
.ambWith(observable.subscribeOn(Schedulers.newThread()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(action);
Now, all results will be in main thread. Check it.