Search code examples
iosgrand-central-dispatchfuturecombine

Execute Combine Future in background thread is not working


If you run this on a Playground:

import Combine
import Foundation

struct User {
    let name: String
}

var didAlreadyImportUsers = false

var importUsers: Future<Bool, Never> {
    Future { promise in
        sleep(5)
        promise(.success(true))
    }
}

var fetchUsers: Future<[User], Error> {
    Future { promise in
        promise(.success([User(name: "John"), User(name: "Jack")]))
    }
}

var users: AnyPublisher<[User], Error> {
    if didAlreadyImportUsers {
        return fetchUsers
            .receive(on: DispatchQueue.global(qos: .userInitiated))
            .eraseToAnyPublisher()
    } else {
        return importUsers
            .receive(on: DispatchQueue.global(qos: .userInitiated))
            .setFailureType(to: Error.self)
            .combineLatest(fetchUsers)
            .map { $0.1 }
        .eraseToAnyPublisher()

    }
}

users
    .receive(on: DispatchQueue.global(qos: .userInitiated))
    .sink(receiveCompletion: { completion in
    print(completion)
}, receiveValue: { value in
    print(value)
})

print("run")

the output will be:

[User(name: "John"), User(name: "Jack")]
run
finished

But I was expecting to get:

run
[User(name: "John"), User(name: "Jack")]
finished

Because the sink should run the code in background thread. What I'm missing here. Do I need to rin the code:

 sleep(5)
 promise(.success(true))

in a background thread ? then what's the purpose of

.receive(on: DispatchQueue.global(qos: .userInitiated))

Solution

  • Your Future runs as soon as it's created, so in your case as soon as this property is accessed:

    var importUsers: Future<Bool, Never> {
      Future { promise in
        sleep(5)
        promise(.success(true))
    
      }
    }
    

    And since the Future runs immediately, that means the closure passed to the promise is executed right away, making the main thread sleep for 5 seconds before it moves on. In your case the Future is created as soon as you access users which is done on the main thread.

    receive(on: affects the thread that sink (or downstream publishers) receive values on, not where they are created. Since the futures are already completed by the time you call .sink, the completion and emitted value are delivered to the sink immediately. On a background queue, but still immediately.

    After that, you finally hit the print("run") line.

    If you replace the sleep(5) bit with this:

    var importUsers: Future<Bool, Never> {
      Future { promise in
        DispatchQueue.global().asyncAfter(deadline: .now() + 5) {
          promise(.success(true))
        }
      }
    }
    

    and make some minor tweaks to your subscription code:

    import PlaygroundSupport
    
    PlaygroundPage.current.needsIndefiniteExecution = true
    
    var cancellables = Set<AnyCancellable>()
    
    users
      .receive(on: DispatchQueue.global(qos: .userInitiated))
      .sink(receiveCompletion: { completion in
        print(completion)
      }, receiveValue: { value in
        print(value)
      }).store(in: &cancellables)
    

    You'll see that the output is printed as expected because that initial future doesn't block the main thread for five seconds.

    Alternatively, if you keep the sleep and subscribe like this you would see the same output:

    import PlaygroundSupport
    
    PlaygroundPage.current.needsIndefiniteExecution = true
    
    var cancellables = Set<AnyCancellable>()
    
    users
      .subscribe(on: DispatchQueue.global(qos: .userInitiated))
      .sink(receiveCompletion: { completion in
        print(completion)
      }, receiveValue: { value in
        print(value)
      }).store(in: &cancellables)
    

    The reason for that is that you subscribe on a background thread, so the subscription and everything is set up off the main thread asynchronously which causes print("run") to run before receiving the Future's result. However, the main thread still slept for 5 seconds as soon as the users property is accessed (which is on the main thread) because that's when you initialize the Future. So the entire output is printed all at once and not with a 5 second sleep after "run".