Search code examples
kotlinobservablerx-java2dispose

Receiving items in onNext() after calling dispose() in RxJava


This is what I read about dispose() here:

In a nutshell, when the Disposable (which is implemented by the TestObserver) gets disposed, the Observer (also TestObserver) will no longer receive values from the Observable.

Here is my code:

private fun createObservableWithDisposable() {
    Observable
            .create { e: ObservableEmitter<String> ->
                val worker = Schedulers.io().createWorker()
                e.setDisposable(worker)
                worker.schedule {
                    for (i in 1..5) {
                        if (i == 3) {
                            worker.dispose()
                            // https://medium.com/@vanniktech/rxjava-2-disposable-under-the-hood-f842d2373e64
                            // After calling dispose(), the subscriber no longer receives items passed in OnNext().
                            // But it doesn't work in my code
                        }
                        e.onNext("Event $i on thread ${Thread.currentThread().name}")
                    }
                }
            }
            .subscribe(
                    { s ->
                        Log.d(TAG, "createObservableWithDisposable onNext msg=$s")
                    },
                    { e ->
                        Log.d(TAG, "createObservableWithDisposable", e)
                    },
                    {
                        Log.d(TAG, "createObservableWithDisposable onComplete")
                    }
            )
}

And this is what I see in Logcat:

2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 1 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 2 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 3 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 4 on thread RxCachedThreadScheduler-1
2019-02-25 08:10:53.414 12071-12101/ru.sample D/RxJavaSamples: createObservableWithDisposable onNext msg=Event 5 on thread RxCachedThreadScheduler-1

I expected to see only the first two emissions. I.e., I thought that after calling dispose() onNext() won't be called.


Solution

  • you disposed worker which had been providing items, not subscriber.

    To stop receiving items, try to

    val compositeDisposable = CompositeDisposable()
        compositeDisposable.add(
                Observable
                        .create { e: ObservableEmitter<String> ->
                            val worker = Schedulers.io().createWorker()
                            e.setDisposable(worker)
                            worker.schedule {
                                for (i in 1..5) {
                                    if (i == 3) {
                                        compositeDisposable.dispose() //changed here
                                    }
                                    e.onNext("Event $i on thread ${Thread.currentThread().name}")
                                }
                            }
    
                        }
                        .subscribe(
                                { s ->
                                    Log.d(TAG, "createObservableWithDisposable onNext msg=$s")
                                },
                                { e ->
                                    Log.d(TAG, "createObservableWithDisposable", e)
                                },
                                {
                                    Log.d(TAG, "createObservableWithDisposable onComplete")
                                }
    
                        )
        )