Search code examples
swiftasync-awaitcombine

Why does Swift Combine lose publications in an asynchronous environment?


func synchronized<T>(_ lock: NSLock, closure:() throws -> T) rethrows -> T {
    lock.lock()
    let r = try closure()
    lock.unlock()
    return r
}

@objcMembers class Main : NSObject {
    static var lock = NSLock()
    static var subscribers: [Int: AnyCancellable] = [:]
    static var signal = DispatchSemaphore(value: 0)
    static func perform() {
        for i in 0 ..< 5 {
            Task {
                await subscribe(i)
            }
        }
        Task {
            try! await Task.sleep(for:.seconds(1))
            BackgroundNetwork.shared.results.append(1)
        }
        signal.wait()
    }
}

// Simulate network request, make a network request and wait for the response
func subscribe(_ i: Int) async {
    await withCheckedContinuation { ctx in
        let subscriber = BackgroundNetwork.shared.dataSource().first().sink { newValue in
            print("Pos\(i). \(newValue)")
            ctx.resume()
        }
        synchronized(Main.lock) {
            Main.subscribers[i] = subscriber
        }
    }

}

class BackgroundNetwork : ObservableObject {
    static var shared = BackgroundNetwork()
    @Published var results: [Int] = []
    
    func dataSource() -> AnyPublisher<[Int], Never> {
        return $results
            .dropFirst() // Prevent the current value from being published
            .eraseToAnyPublisher()
    }
}

I was expecting to receive five publications after executing BackgroundNetwork.shared.results.append(1), but in reality, the program might receive anywhere from one to five publications when it runs. What could be the problem with the code? I've already used locks to protect areas where there could be access conflicts.


Solution

  • As others said in the comments. Your code is a bit confusing because it's combining several different technologies, all of which are aimed at simplifying concurrency, but each of which has a very different model. Mixing them in the same code as you have done here is unusual.

    Let me try reworking your code using combine and then using async/await and see if that helps you.

    First some basic definitions:

    enum NetworkError: Error {
      case networkError(Error)
    }
    
    typealias NetworkResult = Result<String, NetworkError>
    

    In my code there is only one type of error (a network error) and for each network request I want to collect a Result that tells me whether the request succeeds or fails. I'm going to do 5 requests and collect these Result instances. For each technology there will be a function that sends a mock request, and some code that sends 5 of them and collects the results.

    Using Combine

    // Creates a mock network request that finishes after 1 - 5 seconds.
    func mockRequestFromNetworkWithCombine(returning result: String) -> Future<NetworkResult, Never> {
      return Future<NetworkResult, Never> { promise in
        let seconds = (1...5).randomElement()!
        let finish = DispatchTime.now().advanced(by: .seconds(seconds))
    
        DispatchQueue.global(qos: .background).asyncAfter(deadline: finish) {
          promise(.success(.success(result)))
        }
      }
    }
    
    func sendManyCombineRequests() -> AnyPublisher<[NetworkResult], Never> {
        (0..<5)
          .publisher
          .flatMap { index in mockRequestFromNetworkWithCombine(returning: "Combine Result \(index)") }
          .collect()
          .eraseToAnyPublisher()
    }
    
    let subscription = sendManyCombineRequests()
      .sink { results in
        debugPrint(results)
      }
    

    For mockRequestFromNetworkWithCombine I've mixed my technologies a little (Dispatch with Combine). Since it's a mock example, I thought that would be OK.

    The function returns a Future -- which is a Publisher that expected to generate a result exactly once. Things get a little weird here because I want a Publisher that is going to return a Result. Normally Future itself represents a result (either a successful return value or an error). In my case every time I execute a request I will generate a Result but the Future itself always generate a Result - it will never fail.

    It leads to the very unusual looking promise(.success(.success(result))) which means my future succeeded, returning a network result that itself also succeeded.

    sendManyCombineRequests is the bit that sends lots of requests. I generate the requests using the .publisher of a range. I use flatMap because for each index I want to issue a request to the publisher (the Future) that my mock request returns and do something with the results of those publishers. I use the collect publisher operator to gather all the results into a single array, and eraseToAnyPublisher erases the nested types that all those operators create. The result is a Publisher that, when subscribed to, will issue 5 network requests, collect a Result for each into an array and then publish that array.

    The code below that that calls sendManyCombineRequests shows how you might use the Publisher it creates.

    Using Swift Concurrency

    // Creates a mock network request that finishes after 1 - 5 seconds.
    func mockRequestFromNetworkWithConcurrency(returning result: String) async throws -> String {
      let seconds = (1...5).randomElement()!
      let finish = DispatchTime.now().advanced(by: .seconds(seconds))
    
      try await Task.sleep(for: .seconds(seconds))
    
      return result
    }
    
    func sendManyConcurrentRequests() async -> [NetworkResult] {
      await withTaskGroup(of: NetworkResult.self, returning: [NetworkResult].self) { group in
        for i in (0..<5) {
          group.addTask {
            do {
              let result = try await mockRequestFromNetworkWithConcurrency(returning: "Async/Await Request \(i)")
              return .success(result)
            } catch {
              return NetworkResult.failure(.networkError(error))
            }
          }
        }
    
        var results = Array<NetworkResult>()
        for await result in group {
          results.append(result)
        }
    
        return results
      }
    }
    
    let networkTask = Task {
      let results = await sendManyConcurrentRequests()
      debugPrint(results)
    }
    

    For the Swift Concurrency version, my mock network request function is async. It picks a random wait time, hangs around until it's passed, then returns a result. If any error had occurred, it could have thrown an error representing the problem.

    sendManyConcurrentRequests is also async. It runs 5 network requests, collects their results, and returns an array of them.

    First it creates a task group to contain the 5 requests. Again, it uses a range (this time in a for loop) and for each index adds a task that makes the network request and returns either success or failure.

    It then uses "for await result in group" to wait until all five of the network request tasks return some kind of result. It collects the results in an array as they come in then returns the array.

    The bit at the bottom shows how you might create a Task that calls sendManyConcurrentRequests.

    If these examples had "real" error handling they might be a bit more complex, but not by much. This code tries to run all 5 requests simultaneously. If you wanted to run them sequentially you would have to change things a bit.

    But note that neither of these examples requires a lock. In the Combine example, the array is put together by the collect operator that knows how to pull together an array of the results of async publishers. In the Swift Concurrency case, the Task Group manages the details of sequentially delivering the results in the for await loop.