I'm working on a small subsystem that integrates two simple components using RxJava 2. These two components work in a simple client-server manner where the first component produces observable data opening a resource under the hood. The resource is not exposed to the second component. Moreover, it must be open as long as the observable is in use, however the observable object cannot determine when it should be closed. Speaking in code, an example implementation is like this:
private Disposable disposable;
public void onCreate() {
final Maybe<Object> maybeResource = Maybe.defer(() -> {
System.out.println("open");
// here is the resource under the hood, it is encapsulated in the observable and never gets exposed
final Closeable resource = () -> { };
return Maybe.just(resource)
.doOnDispose(() -> {
// this "destructor" is never called, resulting in a resource leak
System.out.println("close");
resource.close();
})
// arbitrary data, does not represent the data I'm working with, but it hides the resource away
.map(closeable -> new Object());
});
disposable = maybeResource.subscribe(data -> System.out.println("process: " + data));
}
public void onUserWorflow() {
// ...
System.out.println("... ... ...");
// ...
}
public void onDestroy() {
disposable.dispose();
}
The output I'd anticipate to get is:
open
process: <...>
... ... ...
close <-- this is never produced
but the last line, close
, is never produced as the doOnDispose
method is not invoked and does not work as I might think it's supposed to.
Therefore the resource gets never released.
There is also Maybe.using
that does a similar thing, but it does not allow to "span" across the "user workflow".
Is there an RxJava/RxJava 2 way that allows managing "closeable" resources closed on disposing a subscriber?
i guess you need to use Observable.create()
instead of Maybe.
Something like that:
final Observable<Object> resourceObservable = Observable.create<Object> {(emitter ->
// do you staff
emitter.onNext(new Object()); //to make observable emit something
emitter.setCancellable (
System.out.println("close");
resource.close();
)
);
disposable = resourceObservable.subscribe(data -> System.out.println("process: " + data));