Most of the Flowable.subscribe()
overloads return a Disposable
which enable a flow to be cleaned up. I'm in the habit of doing:
Disposable d = Flowable.just()
.map(...)
.subscribe(
n -> ...
t -> ...
() -> ...
);
// someone clicks "cancel" in another thread
d.dispose();
However, when using .subscribe(Subscriber)
the Disposable
is not returned. I'd like to use .subscribe(Subscriber)
so I can pass in a TestSubscriber
to verify behaviour. So how would I dispose the flow in this case?
I searched the Javadoc for appropriate Subscriber
s. There's DisposableSubscriber
which looks like it would work, but two problems:
cancel()
cannot be used from outside a flow:Use the protected request(long) to request more items and cancel() to cancel the sequence from within an onNext implementation.
You can use Flowable.subscribeWith(Subscriber)
instead of subscribe
, so that your Subscriber
is returned, instead of void
.
In RxJava 3.x TestSubscriber
no longer implements Disposable
. It does implement the dispose
and isDisposed
methods, as defined by BaseTestConsumer
, which it extends. However, both of those methods have been made protected
, so you can't actually use them directly. Luckily, there is TestSubscriber.cancel()
/TestSubscriber.isCancelled()
, which are public, and are equivalent to dispose()
/isDisposed()
, so you can use those instead.
As for the reason Flowable.subscribe
does not return a Disposable
, this change was made in RxJava 2, to adhere to the Reactive-Streams specification:
Due to the Reactive-Streams specification,
Publisher.subscribe
returnsvoid
...To remedy this, the methodE subscribeWith(E subscriber)
has been added to each base reactive class which returns its input subscriber/observer as is.