I'm observing an unexpected behavior regarding CombineLatest, if the inner publishers has subscribe(on:)
, the CombineLatest stream is not emitting any value.
Notes:
func makePublisher() -> AnyPublisher<Int, Never> {
Deferred {
Future { promise in
DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 3) {
promise(.success(Int.random(in: 0...3)))
}
}
}
.subscribe(on: DispatchQueue.global())
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
var cancellables = Set<AnyCancellable>()
Publishers.CombineLatest(
makePublisher(),
makePublisher()
)
.sink { completion in
print(completion)
} receiveValue: { (a, b) in
print(a, b)
}.store(in: &cancellables)
Is this a combine bug or expected behavior? Do you have any idea of how can be setup this kind of stream where the inners can define its own subscribe scheduler?
Yes, it's a bug. We can simplify the test case to this:
import Combine
import Dispatch
let pub = Just("x")
.subscribe(on: DispatchQueue.main)
let ticket = pub.combineLatest(pub)
.sink(
receiveCompletion: { print($0) },
receiveValue: { print($0) })
This never prints anything. But if you comment out the subscribe(on:)
operator, it prints what's expected. If you leave subscribe(on:)
in, but insert some print()
operators, you'll see that the CombineLatest
operator never sends any demand upstream.
I suggest you copy the CombineX reimplementation of CombineLatest
and the utilities it needs to compile (the CombineX implementations of Lock
and LockedAtomic
, I think). I don't know that the CombineX version works either, but if it's buggy, at least you have the source and can try to fix it.