Search code examples
androidrx-java2

DisposableObserver dispose() not working (updates still received)


My callback (see onResume() below) is still getting called even after calling dispose() in onPause() of MyFragment.java. Why?

I don't think it's important, but: I call NetworkUtils.subscribeToAvgPriceUpdates() from multiple Fragments (only one is visible at once). In each Fragment I have one DisposableObserver and then when I switch to that fragment, I subscribe to data updates using that observer.

NetworkUtils.java:

public static void subscribeToAvgPriceUpdates(DisposableObserver<List<Result>> observer, CoinPriceUpdateCallback callback) {
    if(observer != null && !observer.isDisposed())
        observer.dispose();

    observer = new DisposableObserver<List<Result>>() {
        @Override
        public void onNext(List<Result> results) {
            callback.onUpdated(results);
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onComplete() { }
    };

    MainApplication.apiProvider.getAPI().getAllTickersRx()
            .repeatWhen(objectObservable -> objectObservable.delay(15000, TimeUnit.MILLISECONDS) )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
}

MyFragment.java:

private DisposableObserver<List<Result>> tickerUpdateObserver;

@Override
public void onPause () {
    if(tickerUpdateObserver != null)
        tickerUpdateObserver.dispose();

    super.onPause();
}

@Override
public void onResume() {
    super.onResume();

    NetworkUtils.subscribeToAvgPriceUpdates(tickerUpdateObserver, results -> {
        // still getting called even after I switch to another fragment, why?
        // shouldn't .dispose() in onPause() stop the updates?
    });
}

Solution

  • The problem is that you create a new DisposableObserver inside the method but the field tickerUpdateObserver remains the same first instance, thus you don't have a reference to the newer observers.

    You could just return the new DisposableObserver from the method and update the field:

    public static DisposableObserver<List<Result>> subscribeToAvgPriceUpdates(
            DisposableObserver<List<Result>> observer, 
            CoinPriceUpdateCallback callback) {
        if(observer != null && !observer.isDisposed())
            observer.dispose();
    
        observer = new DisposableObserver<List<Result>>() {
            @Override
            public void onNext(List<Result> results) {
                callback.onUpdated(results);
            }
    
            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }
    
            @Override
            public void onComplete() { }
        };
    
        MainApplication.apiProvider.getAPI().getAllTickersRx()
            .repeatWhen(objectObservable -> 
                objectObservable.delay(15000, TimeUnit.MILLISECONDS) )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
    
        return observer;
    }
    
    @Override
    public void onResume() {
        super.onResume();
    
        tickerUpdateObserver = NetworkUtils.subscribeToAvgPriceUpdates(
                tickerUpdateObserver, results -> {
            // still getting called even after I switch to another fragment, why?
            // shouldn't .dispose() in onPause() stop the updates?
        });
    }