Take a look at this example:
func query2() -> Observable<Int> {
print("query2() called")
return Observable.create { observer in
print("creating query2() thread")
let thread = Thread.init(block: {
sleep(1)
let numbers = [
1,2,3,4,5
]
for num in numbers {
observer.onNext(num)
}
observer.onCompleted()
})
thread.start()
return Disposables.create {
thread.cancel()
}
}
}
let numbers = query2()
let even = numbers.filter { $0 % 2 == 0 }
let odd = numbers.filter { $0 % 2 != 0 }
let merged = even.concat(odd)
merged.subscribe(onNext: { n in
print(n)
})
The expected output is:
query2() called
creating query() thread
2
4
1
3
5
However, the thread seems to be created a second time when it comes time to pull values from odd
.
Actual Output:
query2() called
creating query2() thread
2
4
creating query2() thread
1
3
5
I looked at this code, and thought - ah I missed the .share()
operator, since the even
and odd
are derived from the same stream. I did not add that in originally because I eventually merge those into a single stream merged
to be subscribed on, and thought that Rx would do the optimization for me.
So I used share(): let numbers = query2().share()
The output still remained the same.
How can I prevent this from happening?
The closure passed to Observable.create
is called every time the resultant Observable is subscribed to.
The share()
operator has a reference count. It will subscribe to its source when it receives a subscribe request, then if it receives another request while the source is running it will dispatch events to the second source as well.
The surprise here is that your observable completes before the first subscribe returns, so there is nothing to share. Note that when you call onNext
inside your background thread, it immediately calls the closure passed into subscribe within that background thread. When you call onCompleted
it immediately completes. It doesn't wait for the subscribe to exit to do these things.
The solution here is to make your numbers
observable hot with the multicast operator. Pass it a ReplaySubject which will store the output and replay it to any subsequent subscribers. Don't forget to call connect()
to cause the Observable to run its generator function.
Something like this:
let numbers = Observable.deferred { () -> Observable<Int> in
print("called")
return Observable.from([1, 2, 3, 4, 5, 6])
}
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .default)) // this ensures that the block passed to `deferred` is called on a background thread.
.multicast(ReplaySubject<Int>.createUnbounded())
numbers.connect()
let even = numbers.filter { $0 % 2 == 0 }
let odd = numbers.filter { $0 % 2 != 0 }
let merged = even.concat(odd)
merged.subscribe(onNext: { print($0) })