Search code examples
javarx-javareactive-programmingrx-java3

Why doesn't Flowable.subscribe(Subscriber) return a Disposable?


Most of the Flowable.subscribe() overloads return a Disposable which enable a flow to be cleaned up. I'm in the habit of doing:

Disposable d = Flowable.just()
    .map(...)
    .subscribe(
        n -> ...
        t -> ...
        () -> ...
    );

// someone clicks "cancel" in another thread
d.dispose();

However, when using .subscribe(Subscriber) the Disposable is not returned. I'd like to use .subscribe(Subscriber) so I can pass in a TestSubscriber to verify behaviour. So how would I dispose the flow in this case?

I searched the Javadoc for appropriate Subscribers. There's DisposableSubscriber which looks like it would work, but two problems:

  1. The class description reads as follows, which suggests cancel() cannot be used from outside a flow:

Use the protected request(long) to request more items and cancel() to cancel the sequence from within an onNext implementation.

  1. TestSubscriber does not extend DisposableSubscriber.

Solution

  • You can use Flowable.subscribeWith(Subscriber) instead of subscribe, so that your Subscriber is returned, instead of void.

    In RxJava 3.x TestSubscriber no longer implements Disposable. It does implement the dispose and isDisposed methods, as defined by BaseTestConsumer, which it extends. However, both of those methods have been made protected, so you can't actually use them directly. Luckily, there is TestSubscriber.cancel()/TestSubscriber.isCancelled(), which are public, and are equivalent to dispose()/isDisposed(), so you can use those instead.

    As for the reason Flowable.subscribe does not return a Disposable, this change was made in RxJava 2, to adhere to the Reactive-Streams specification:

    Due to the Reactive-Streams specification, Publisher.subscribe returns void ...To remedy this, the method E subscribeWith(E subscriber) has been added to each base reactive class which returns its input subscriber/observer as is.