I have an observable which regularly emits elements. On those elements, I perform one fast and one slow operation. What I want is to drop new elements for slow observer while it is busy. Is there any way to achieve this with Rx instead of keeping a flag in slow operation?
I am very new at Reactive extensions, please correct me if anything is wrong with my assumptions.
let tick = Observable<Int>.interval(.seconds(1),
scheduler: SerialDispatchQueueScheduler(qos: .background)).share()
tick.subscribe {
print("fast observer \($0)")
}.disposed(by: disposeBag)
// observing in another queue so that it does not block the source
tick.observeOn(SerialDispatchQueueScheduler(qos: .background))
.subscribe {
print("slow observer \($0)")
sleep(3) // cpu-intensive task
}.disposed(by: disposeBag)
For this, flatMap is your friend. Whenever you want to drop events (either the current one when a new one comes in, or subsequent ones while working on the current one) use flatMap. More information can be found in my article: RxSwift’s Many Faces of FlatMap
Here you go:
let tick = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).share()
func cpuLongRunningTask(_ input: Int) -> Observable<Int> {
return Observable.create { observer in
print("start task")
print("finish task")
return Disposables.create { /* cancel the task if possible */ }
.subscribe {
print("fast \($0)")
.disposed(by: disposeBag)
.flatMapFirst {
// subscribing in another scheduler so that it does not block the source
.subscribeOn(SerialDispatchQueueScheduler(qos: .background))
.observeOn(MainScheduler.instance) // make sure the print happens on the main thread
.subscribe {
print("slow \($0)")
.disposed(by: disposeBag)
Sample output as follows:
fast next(0)
start task
fast next(1)
fast next(2)
fast next(3)
finish task
slow next(0)
fast next(4)
start task
fast next(5)
fast next(6)
fast next(7)
finish task
slow next(4) <-- slow ignored the 1, 2, and 3 values.