Search code examples
androidobservablerx-javarx-java2

Stop Observable.timer() inside takeUntil() manually


Case: Observing some items during next 5 seconds, but also I want manually to terminate getting items.

I have this code


private var disposable: Disposable? = null

...

observableThatHasToBeAliveAllTime
    .switchMap {
        observableThatEmitsItemOver5SecsWhenUpperObsEmits()
            .takeUntil(
                Observable.timer(5, TimeUnit.SECONDS)
                    .also { disposable = it.subscribe() }
            )
            .switchMap { /* some work */ }
    }
    .subscribe { /* handle result */ }

and in other place I call disposable?.dispose, but takeUntil() keeps working

What am I doing wrong?


Solution

  • What am I doing wrong?

    That it.subscribe() subscribes to timer completely separately from takeUntil and has no effect. Also takeUntil requires an onNext or onComplete signal from the other source thus dispose would not work as expected either.

    You could use a separate subject with another takeUntil to stop the flow:

    var stop = PublishSubject.create<Object>()
    
    observableThatHasToBeAliveAllTime
        .switchMap {
            observableThatEmitsItemOver5SecsWhenUpperObsEmits()
                .takeUntil(
                    Observable.timer(5, TimeUnit.SECONDS)
                )
                .takeUntil(stop)
                .switchMap { /* some work */ }
        }
        .subscribe { /* handle result */ }
    
    stop.onNext("Stop!");