Search code examples
iosswiftcombinecombinelatest

CombineLatest operator is not emitting when inners publishers use subscribe(on:)


I'm observing an unexpected behavior regarding CombineLatest, if the inner publishers has subscribe(on:), the CombineLatest stream is not emitting any value.

Notes:

  • With Zip operator is working
  • Moving the subscribe(on:) / receive(on:) to the combineLatest stream also work. But in this particular use case, the inner publishers is defining their subscribe/receive because are (re)used in other places.
  • Adding subscribe(on:)/receive(on:) to only one of the inner publishers also work, so the problem is just when both have it.
    func makePublisher() -> AnyPublisher<Int, Never> {
        Deferred {
            Future { promise in
                DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 3) {
                    promise(.success(Int.random(in: 0...3)))
                }
            }
        }
        .subscribe(on: DispatchQueue.global())
        .receive(on: DispatchQueue.main)
        .eraseToAnyPublisher()
    }
    
    var cancellables = Set<AnyCancellable>()
    Publishers.CombineLatest(
        makePublisher(),
        makePublisher()
    )
    .sink { completion in
        print(completion)
    } receiveValue: { (a, b) in
        print(a, b)
    }.store(in: &cancellables)

Is this a combine bug or expected behavior? Do you have any idea of how can be setup this kind of stream where the inners can define its own subscribe scheduler?


Solution

  • Yes, it's a bug. We can simplify the test case to this:

    import Combine
    import Dispatch
    
    let pub = Just("x")
        .subscribe(on: DispatchQueue.main)
    
    let ticket = pub.combineLatest(pub)
        .sink(
            receiveCompletion: { print($0) },
            receiveValue: { print($0) })
    

    This never prints anything. But if you comment out the subscribe(on:) operator, it prints what's expected. If you leave subscribe(on:) in, but insert some print() operators, you'll see that the CombineLatest operator never sends any demand upstream.

    I suggest you copy the CombineX reimplementation of CombineLatest and the utilities it needs to compile (the CombineX implementations of Lock and LockedAtomic, I think). I don't know that the CombineX version works either, but if it's buggy, at least you have the source and can try to fix it.