Search code examples
asynchronousjava-8rx-javareactive-programmingreactivex

Observables Are Less Opinionated?


As stated in the ReactiveX Introduction - Observables Are Less Opinionated

ReactiveX is not biased toward some particular source of concurrency or asynchronicity. Observables can be implemented using thread-pools, event loops, non-blocking I/O, actors (such as from Akka), or whatever implementation suits your needs, your style, or your expertise. Client code treats all of its interactions with Observables as asynchronous, whether your underlying implementation is blocking or non-blocking and however you choose to implement it.

I am not getting the part - "whether your underlying implementation is blocking or non-blocking".

Can you explain more of it? Or some example code to explain this?


Solution

  • Observable.fromCallable(() -> doSomeReallyLongNetworkRequest())
        .subscribe(data -> {
            showTheDataOnTheUI(data);
        });
    

    Where do you think the doSomeReallyLongNetworkRequest() will run (thread-wise)?
    Well if you will run this code at main thread, the network call will run at main thread!

    "Observables Are Less Opinionated" means that the multi threading is abstracted away from the actual work. the Subscriber don't know (and don't need to), where the Observable will run, it can run on thread-pool, event loop, or it can even run in blocking fashion.
    That's why all Observable interaction happen with async API.

    While putting it this way seems like a drawback, the opposite is true, that means that you have greater control where each part of your code is run, without exposing the operation itself and the code that react to Observable emission to this knowledge.

    This is done using the Schedulers mechanism in RxJava, together with subscribeOn()/observeOn() operators:

    Observable.fromCallable(() -> doSomeReallyLongNetworkRequest())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(data -> {
            showTheDataOnTheUI(data);
        });
    

    now you told the Observable to perform the subscription code (doSomeReallyLongNetworkRequest()) to run on IO Schdeuler that will create a dedicated thread for the network request, and on the other side, you told the Observable to notify the about emissions (onNext()) Subscriber (showTheDataOnTheUI(data)) at main thread (sorry for android specifics).

    With this approach you have very powerful mechanism to determine where and how both operations will work and where notifications will be fired, and very easily ping pong between different threads, this great power comes because of the async API, plus the abstraction of threads away to dedicated operators and Scheduler mechanism.

    UPDATE: further explanation:

    Client code treats all of its interactions with Observables as asynchronous

    Client code here means any code that interacts with the Observable, the simple example is the Subscriber which is the client of the Observable, as of the composable nature of Observable, you can get an Observable from some API without knowing exactly how its operate so :

    Observable.fromCallable(() -> doSomeReallyLongNetworkRequest())
    

    can be encapsulate with some service API as Observable<Data>, and when Subscriber interacts with it, it's happen with async fashion using the Observable events onNext,onError,onComplete.

    whether your underlying implementation is blocking or non-blocking and however you choose to implement it.

    "underlying implementation" refers to the operation the Observable do, it can be blocking work like my example of network call, but it can also be a notifications from the UI (clicks events), or update happens at some external module, all of which are non-blocking implementation, and again as of its async API nature the Subscribe shouldn't care, it just need to react to notifications without worrying about where and how the implementation (Observable body) will act.