Search code examples
swiftcombine

Why Does My Swift Combine Publisher Emit Unexpected Values After flatMap Transformation?


I'm working on a Swift project using the Combine framework, and I've encountered a puzzling issue with flatMap. Here’s a simplified version of the code that demonstrates the problem:

import Combine

struct MyData {
    let value: Int
}

let publisher = PassthroughSubject<MyData, Never>()

let subscription = publisher
    .flatMap { data -> AnyPublisher<MyData, Never> in
        // Simulate a network call or some async operation
        Just(data).delay(for: .seconds(1), scheduler: RunLoop.main)
            .eraseToAnyPublisher()
    }
    .sink(receiveValue: { data in
        print("Received data: \(data.value)")
    })

publisher.send(MyData(value: 1))
publisher.send(MyData(value: 2))

In the above code, I expect the sink subscriber to receive values in the order they are sent through the PassthroughSubject. However, I notice that the output sometimes appears out of order. For example, I might see:

Received data: 2
Received data: 1

This behavior seems inconsistent and I’m not sure if it’s due to how flatMap handles concurrency or if there’s another underlying issue. Could someone explain why this might be happening and how I can ensure the values are processed in the correct order?


Solution

  • The solution here is to use an array and combineLatest. This will ensure that the items are kept in the same order no matter how long any one network call takes.

    Something like this:

        struct MyData {
            let value: Int
        }
    
        let publisher = PassthroughSubject<[MyData], Never>()
    
        let subscription = publisher.map { array in
            combineLatest(array.map { data in
                // Simulate a network call or some async operation
                Just(data).delay(for: .seconds(Int.random(in: 1..<5)), scheduler: RunLoop.main)
                    .eraseToAnyPublisher()
            })
        }
            .switchToLatest()
            .sink(receiveValue: { data in
                print("Received data: \(data)")
            })
    
    
        publisher.send([MyData(value: 1), MyData(value: 2)])
    

    The above uses this function to allow combineLatest to work for an array:

    func combineLatest<Pub>(_ pubs: [Pub]) -> AnyPublisher<[Pub.Output], Pub.Failure> where Pub: Publisher {
        guard !pubs.isEmpty else { return Empty().eraseToAnyPublisher() }
        return pubs.dropFirst().reduce(pubs[0].map { [$0] }.eraseToAnyPublisher()) { partial, next in
            partial.combineLatest(next)
                .map { $0 + [$1] }
                .eraseToAnyPublisher()
        }
    }
    

    Another option would be to use collect() but then you have to finish the publisher before any of the network requests would even start.

    let subscription = publisher.map { data in
        // Simulate a network call or some async operation
        Just(data).delay(for: .seconds(Int.random(in: 1..<5)), scheduler: RunLoop.main)
            .eraseToAnyPublisher()
    }
        .collect()
        .flatMap { combineLatest($0) }
        .sink(receiveValue: { data in
            print("Received data: \(data)")
        })
    
    publisher.send(MyData(value: 1))
    publisher.send(MyData(value: 2))
    publisher.send(completion: .finished)