Case: Observing some items during next 5 seconds, but also I want manually to terminate getting items.
I have this code
private var disposable: Disposable? = null
...
observableThatHasToBeAliveAllTime
.switchMap {
observableThatEmitsItemOver5SecsWhenUpperObsEmits()
.takeUntil(
Observable.timer(5, TimeUnit.SECONDS)
.also { disposable = it.subscribe() }
)
.switchMap { /* some work */ }
}
.subscribe { /* handle result */ }
and in other place I call disposable?.dispose
, but takeUntil() keeps working
What am I doing wrong?
What am I doing wrong?
That it.subscribe()
subscribes to timer
completely separately from takeUntil
and has no effect. Also takeUntil
requires an onNext
or onComplete
signal from the other source thus dispose would not work as expected either.
You could use a separate subject with another takeUntil
to stop the flow:
var stop = PublishSubject.create<Object>()
observableThatHasToBeAliveAllTime
.switchMap {
observableThatEmitsItemOver5SecsWhenUpperObsEmits()
.takeUntil(
Observable.timer(5, TimeUnit.SECONDS)
)
.takeUntil(stop)
.switchMap { /* some work */ }
}
.subscribe { /* handle result */ }
stop.onNext("Stop!");