Search code examples
swiftreactive-programmingrx-swift

How to drop new elements if an observer is busy?


I have an observable which regularly emits elements. On those elements, I perform one fast and one slow operation. What I want is to drop new elements for slow observer while it is busy. Is there any way to achieve this with Rx instead of keeping a flag in slow operation?

I am very new at Reactive extensions, please correct me if anything is wrong with my assumptions.

let tick = Observable<Int>.interval(.seconds(1), 
            scheduler: SerialDispatchQueueScheduler(qos: .background)).share()

tick.subscribe {
    print("fast observer \($0)")
}.disposed(by: disposeBag)

// observing in another queue so that it does not block the source
tick.observeOn(SerialDispatchQueueScheduler(qos: .background))
    .subscribe {
        print("slow observer \($0)")
        sleep(3) // cpu-intensive task
    }.disposed(by: disposeBag)


Solution

  • For this, flatMap is your friend. Whenever you want to drop events (either the current one when a new one comes in, or subsequent ones while working on the current one) use flatMap. More information can be found in my article: RxSwift’s Many Faces of FlatMap

    Here you go:

    let tick = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).share()
    
    func cpuLongRunningTask(_ input: Int) -> Observable<Int> {
        return Observable.create { observer in
            print("start task")
            sleep(3)
            print("finish task")
            observer.onNext(input)
            observer.onCompleted()
            return Disposables.create { /* cancel the task if possible */ }
        }
    }
    
    tick
        .subscribe {
            print("fast \($0)")
        }
        .disposed(by: disposeBag)
    
    tick
        .flatMapFirst {
            // subscribing in another scheduler so that it does not block the source
            cpuLongRunningTask($0)
                .subscribeOn(SerialDispatchQueueScheduler(qos: .background))
        }
        .observeOn(MainScheduler.instance) // make sure the print happens on the main thread
        .subscribe {
            print("slow \($0)")
        }
        .disposed(by: disposeBag)
    

    Sample output as follows:

    fast next(0)
    start task
    fast next(1)
    fast next(2)
    fast next(3)
    finish task
    slow next(0)
    fast next(4)
    start task
    fast next(5)
    fast next(6)
    fast next(7)
    finish task
    slow next(4) <-- slow ignored the 1, 2, and 3 values.