Search code examples
swiftmacosrx-swiftreactivex

RxSwift - doing control flow leads to async operation being performed twice


Take a look at this example:

func query2() -> Observable<Int> {
    print("query2() called")
    return Observable.create { observer in
        print("creating query2() thread")
        let thread = Thread.init(block: {
            sleep(1)
            let numbers = [
                1,2,3,4,5
            ]
            for num in numbers {
                observer.onNext(num)
            }
            observer.onCompleted()
        })
        thread.start()
        return Disposables.create {
            thread.cancel()
        }
    }
}

let numbers = query2()
let even = numbers.filter { $0 % 2 == 0 }
let odd = numbers.filter { $0 % 2 != 0 }
let merged = even.concat(odd)

merged.subscribe(onNext: { n in
    print(n)
})

The expected output is:

query2() called
creating query() thread
2
4
1
3
5

However, the thread seems to be created a second time when it comes time to pull values from odd.

Actual Output:

query2() called
creating query2() thread
2
4
creating query2() thread
1
3
5

I looked at this code, and thought - ah I missed the .share() operator, since the even and odd are derived from the same stream. I did not add that in originally because I eventually merge those into a single stream merged to be subscribed on, and thought that Rx would do the optimization for me.

So I used share(): let numbers = query2().share()

The output still remained the same.

How can I prevent this from happening?


Solution

  • The closure passed to Observable.create is called every time the resultant Observable is subscribed to.

    The share() operator has a reference count. It will subscribe to its source when it receives a subscribe request, then if it receives another request while the source is running it will dispatch events to the second source as well.

    The surprise here is that your observable completes before the first subscribe returns, so there is nothing to share. Note that when you call onNext inside your background thread, it immediately calls the closure passed into subscribe within that background thread. When you call onCompleted it immediately completes. It doesn't wait for the subscribe to exit to do these things.

    The solution here is to make your numbers observable hot with the multicast operator. Pass it a ReplaySubject which will store the output and replay it to any subsequent subscribers. Don't forget to call connect() to cause the Observable to run its generator function.

    Something like this:

    let numbers = Observable.deferred { () -> Observable<Int> in
        print("called")
        return Observable.from([1, 2, 3, 4, 5, 6])
    }
    .subscribe(on: ConcurrentDispatchQueueScheduler(qos: .default)) // this ensures that the block passed to `deferred` is called on a background thread.
    .multicast(ReplaySubject<Int>.createUnbounded())
    
    numbers.connect()
    
    let even = numbers.filter { $0 % 2 == 0 }
    let odd = numbers.filter { $0 % 2 != 0 }
    let merged = even.concat(odd)
    
    merged.subscribe(onNext: { print($0) })