Search code examples
androidretrofit2rx-java2

Rxjava observeOn and subscribeOn in Retrofit


observeOn: This method simply changes the thread of all operators further downstream (https://medium.com/upday-devs/rxjava-subscribeon-vs-observeon-9af518ded53a)

When calling API, I want to run the communicating with a server on IO thread and want to handle the result on mainThread.

I see the below code in many tutorials and no doubt it is correct. but my understanding is opposite so I'd like to know what I'm misunderstanding.

requestInterface.callApi()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe())

observeOn(AndroidSchedulers.mainThread())

: observeOn changes the thread of all operators further downstream, but in the example, the actual calling API function is upper than observeOn?

.subscribeOn(Schedulers.io())

: Weird part, it needs to be subscribed on main-thread, but subscribe on IO thread?

Please advise me What I'm misunderstanding?


Solution

  • Basic, we will have

    Observable.subscribe(Observer);// => Observer observe Observable and Observable subscribe Observer
    

    Example

    requestInterface.callApi().subscribe(new Observer...); // requestInterface.callApi() <=> Observable
    

    From the http://reactivex.io/documentation/operators/subscribeon.html

    SubscribeOn

    • SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called

    ObserveOn (affect 2 things)

    • It instructs the Observable to send notifications to Observers on a specified Scheduler.

    • ObserveOn affects the thread that the Observable will use below where that operator appears

    Example

    registerUserReturnedObserverble()  // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
    .andThen(loginReturnObserverble()) // run on worker thread because subscribeOn(Schedulers.io()) (line 5)
    .observeOn(AndroidSchedulers.mainThread())
    .andThen(getUserDataReturnObserverble()) // run on main thread because .observeOn(AndroidSchedulers.mainThread()) is above this operator (line 3)
    .subscribeOn(Schedulers.io())
    .subscribe(new Observer<Void>{
        // run on main thread because observeOn(AndroidSchedulers.mainThread()) 
    });