I have the following observable:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool( 1 );
Observable<List<Widget>> findWidgetsObservable = Observable.create( emitter -> {
executorService.scheduleWithFixedDelay( emitFindWidgets( emitter, 0, 30, TimeUnit.SECONDS );
} );
private Runnable emitFindWidgets( ObservableEmitter<List<Widgets>> emitter ) {
return () -> {
emitter.onNext( Collections.emptyList() ); // dummy empty array
};
}
And I'm returning it in a graphql-java subscription resolver like so:
ConnectableObservable<List<Widget>> connectableObservable = findWidgetsObservable.share().publish();
Disposable connectionDisposable = connectableObservable.connect();
return connectableObservable.toFlowable( BackpressureStrategy.LATEST )
The graphql subscription works as expected and emits data to the JavaScript graphql client, but when the client unsubscribes, my Runnable continues seemingly infinitely. That said, the flowable's doOnCancel() event handler IS being run.
In order to remedy this problem, I've attempted to do the following within the flowable's doOnCancel():
Disposable connectionDisposable = connectableObservable.connect();
return connectableObservable.toFlowable( BackpressureStrategy.LATEST ).doOnCancel( () -> {
findWidgetsObservable.toFuture().cancel( true );
connectionDisposable.dispose();
})
However, the Runnable continues omitting indefinitely. Is there any way I can solve this problem and completely stop the emits?
I did have one thought: scheduleWithFixedDelay returns a ScheduledFuture, which has a cancel() method, but I'm not sure that there's anyway I can do that when the scheduling itself is scoped within an observable! Any help is appreciated.
The runnable keeps on emitting because you are scheduling the emission on a scheduler that is not known/bound to observable stream.
When you dispose your connection, you stop receiving the items from upstream because the connection to upstream observable is cut. But since you are scheduling the emitter to run repeatedly on a separate scheduler, the runnable keeps running.
You can describe the custom scheduling behavior using a custom scheduler and passing it in subscribeOn(Your-Custom-Scheduler)
Also, you mentioned you can invoke cancel()
on ScheduledFuture
in doOnDispose()
.
But you should switch schedulers explicitly in the observable chain. Otherwise, it becomes harder to debug.