Search code examples
rx-java2reactivex

Difference between Observer and DisposableObserver in RxJava2


I know that DisposableObserver has implemented both of Observer and Disposable. Flagged onSubscribe() method as final and provided onStart() instead of it.

But I can't understand what are the differences between this two in action. And when I should use Observer or DisposableObserver?

Could you please tell the benefits or disadvantages of using these as compared to each other?


Solution

  • The normal onSubscribe method of observers can be called as many as you use this same observer. but with DisposableObserver onSubscibe method can be called only once meaning that you are only allowed to use this DisposableObserver once. if you pass a single object of DisposableObserver to two stream it will throw exception and close both of them quickly. this logic has been implemented in onSubscribe() of this class thus you can not override it. but in case you need the onSubscribe() callback you can override onStart() method which is the same.

    The usage of this class can be as follows.

    As per documentation a DisposableObserver is:

    An abstract Observer that allows asynchronous cancellation by implementing Disposable.

    in other words it means that you can use disposable behaviors inside your observer methods. like calling dispose() in onNext().

    Observable.just(1, 2, 3, 4, 5)
            .map {
                "$it"
            }
            .subscribe(object : DisposableObserver<String>(){
                override fun onComplete() {
                }
    
                override fun onNext(t: String) {
                    println("first item only= $t")
    
                    //we can dispose this stream right in the observer methods
                    dispose()
                }
    
                override fun onError(e: Throwable) {
                }
    
            })
    

    One may even combine DisposableObserver with subscribeWith() to get almost the same behavior as normal observers.

    val disposableObserver = object : DisposableObserver<String>() {
            override fun onComplete() {
            }
    
            override fun onNext(t: String) {
                println("first item only= $t")
    
                //we can dispose this stream right in the observer methods
                dispose()
            }
    
            override fun onError(e: Throwable) {
            }
    
        }
    
        val disposable: Disposable = Observable.just(1, 2, 3, 4, 5)
            .map {
                "$it"
            }
            .subscribeWith(disposableObserver)
    
        disposable.dispose()
    

    This and many other classes and operators in RX-Java are there to ease the use of RX and depending on how you want to use the library any of them can be chosen.