Search code examples
swift3observablerx-swiftreactivexrx-cocoa

RxSwift: Repeat a (completed) stream


Assume I have a button which can be used to start and stop (toggle) an action.

let toggleStream: Observable<Bool> = toggleBtn.rx.tap.scan(false) { state, _ in !state }

I have another stream, that emits Integers continuously.

let emitter = Observable<Int>.interval(2.0, scheduler: timerScheduler)

Now I want to use the toggle stream to start and stop the emitting of the second stream. This is my approach:

Observable.combineLatest(toggleStream, emitter) { shouldEmit, evt in
    return (shouldEmit, evt)
}.takeWhile{ (shouldEmit, evt:Int) in
    return shouldEmit == true
}.map {(_, evt) in
    return evt
}

This works great for the first time. I can press the button and the Observable starts emitting its Ints. Also stopping works. Sadly I can't start it for a second time, because the stream is completed. How can I restart/retry/repeat it when the user toggles the button again?


Solution

  • Here's how I did it in the playground. You should be able to extrapolate:

    //: Playground - noun: a place where people can play
    
    import RxSwift
    
    let toggleButton = PublishSubject<Void>()
    let toggleStream: Observable<Bool> = toggleButton
        .scan(false) { state, _ in !state }
        .debug()
        .shareReplayLatestWhileConnected()
    
    let emit = toggleStream
        .filter { $0 }
        .flatMapLatest { _ in
            Observable<Int>.interval(2.0, scheduler: MainScheduler.instance)
                .takeUntil(toggleStream.filter { !$0 })
        }
    
    
    _ = emit.subscribe( {
        print($0)
    })
    
    toggleButton.onNext()
    DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 5.0) {
        toggleButton.onNext()
    }
    DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 11.0) {
        toggleButton.onNext()
    }
    DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 17.0) {
        toggleButton.onNext()
    }
    
    import PlaygroundSupport
    PlaygroundPage.current.needsIndefiniteExecution = true