func synchronized<T>(_ lock: NSLock, closure:() throws -> T) rethrows -> T {
lock.lock()
let r = try closure()
lock.unlock()
return r
}
@objcMembers class Main : NSObject {
static var lock = NSLock()
static var subscribers: [Int: AnyCancellable] = [:]
static var signal = DispatchSemaphore(value: 0)
static func perform() {
for i in 0 ..< 5 {
Task {
await subscribe(i)
}
}
Task {
try! await Task.sleep(for:.seconds(1))
BackgroundNetwork.shared.results.append(1)
}
signal.wait()
}
}
// Simulate network request, make a network request and wait for the response
func subscribe(_ i: Int) async {
await withCheckedContinuation { ctx in
let subscriber = BackgroundNetwork.shared.dataSource().first().sink { newValue in
print("Pos\(i). \(newValue)")
ctx.resume()
}
synchronized(Main.lock) {
Main.subscribers[i] = subscriber
}
}
}
class BackgroundNetwork : ObservableObject {
static var shared = BackgroundNetwork()
@Published var results: [Int] = []
func dataSource() -> AnyPublisher<[Int], Never> {
return $results
.dropFirst() // Prevent the current value from being published
.eraseToAnyPublisher()
}
}
I was expecting to receive five publications after executing BackgroundNetwork.shared.results.append(1)
, but in reality, the program might receive anywhere from one to five publications when it runs. What could be the problem with the code? I've already used locks to protect areas where there could be access conflicts.
As others said in the comments. Your code is a bit confusing because it's combining several different technologies, all of which are aimed at simplifying concurrency, but each of which has a very different model. Mixing them in the same code as you have done here is unusual.
Let me try reworking your code using combine and then using async/await and see if that helps you.
First some basic definitions:
enum NetworkError: Error {
case networkError(Error)
}
typealias NetworkResult = Result<String, NetworkError>
In my code there is only one type of error (a network error) and for each network request I want to collect a Result
that tells me whether the request succeeds or fails. I'm going to do 5 requests and collect these Result
instances. For each technology there will be a function that sends a mock request, and some code that sends 5 of them and collects the results.
// Creates a mock network request that finishes after 1 - 5 seconds.
func mockRequestFromNetworkWithCombine(returning result: String) -> Future<NetworkResult, Never> {
return Future<NetworkResult, Never> { promise in
let seconds = (1...5).randomElement()!
let finish = DispatchTime.now().advanced(by: .seconds(seconds))
DispatchQueue.global(qos: .background).asyncAfter(deadline: finish) {
promise(.success(.success(result)))
}
}
}
func sendManyCombineRequests() -> AnyPublisher<[NetworkResult], Never> {
(0..<5)
.publisher
.flatMap { index in mockRequestFromNetworkWithCombine(returning: "Combine Result \(index)") }
.collect()
.eraseToAnyPublisher()
}
let subscription = sendManyCombineRequests()
.sink { results in
debugPrint(results)
}
For mockRequestFromNetworkWithCombine
I've mixed my technologies a little (Dispatch with Combine). Since it's a mock example, I thought that would be OK.
The function returns a Future
-- which is a Publisher
that expected to generate a result exactly once. Things get a little weird here because I want a Publisher
that is going to return a Result
. Normally Future
itself represents a result (either a successful return value or an error). In my case every time I execute a request I will generate a Result
but the Future
itself always generate a Result
- it will never fail.
It leads to the very unusual looking promise(.success(.success(result)))
which means my future succeeded, returning a network result that itself also succeeded.
sendManyCombineRequests
is the bit that sends lots of requests. I generate the requests using the .publisher
of a range. I use flatMap
because for each index I want to issue a request to the publisher (the Future
) that my mock request returns and do something with the results of those publishers. I use the collect
publisher operator to gather all the results into a single array, and eraseToAnyPublisher
erases the nested types that all those operators create. The result is a Publisher
that, when subscribed to, will issue 5 network requests, collect a Result
for each into an array and then publish that array.
The code below that that calls sendManyCombineRequests
shows how you might use the Publisher
it creates.
// Creates a mock network request that finishes after 1 - 5 seconds.
func mockRequestFromNetworkWithConcurrency(returning result: String) async throws -> String {
let seconds = (1...5).randomElement()!
let finish = DispatchTime.now().advanced(by: .seconds(seconds))
try await Task.sleep(for: .seconds(seconds))
return result
}
func sendManyConcurrentRequests() async -> [NetworkResult] {
await withTaskGroup(of: NetworkResult.self, returning: [NetworkResult].self) { group in
for i in (0..<5) {
group.addTask {
do {
let result = try await mockRequestFromNetworkWithConcurrency(returning: "Async/Await Request \(i)")
return .success(result)
} catch {
return NetworkResult.failure(.networkError(error))
}
}
}
var results = Array<NetworkResult>()
for await result in group {
results.append(result)
}
return results
}
}
let networkTask = Task {
let results = await sendManyConcurrentRequests()
debugPrint(results)
}
For the Swift Concurrency version, my mock network request function is async. It picks a random wait time, hangs around until it's passed, then returns a result. If any error had occurred, it could have thrown an error representing the problem.
sendManyConcurrentRequests
is also async. It runs 5 network requests, collects their results, and returns an array of them.
First it creates a task group to contain the 5 requests. Again, it uses a range (this time in a for
loop) and for each index adds a task that makes the network request and returns either success or failure.
It then uses "for await result in group" to wait until all five of the network request tasks return some kind of result. It collects the results in an array as they come in then returns the array.
The bit at the bottom shows how you might create a Task that calls sendManyConcurrentRequests
.
If these examples had "real" error handling they might be a bit more complex, but not by much. This code tries to run all 5 requests simultaneously. If you wanted to run them sequentially you would have to change things a bit.
But note that neither of these examples requires a lock. In the Combine
example, the array is put together by the collect
operator that knows how to pull together an array of the results of async publishers. In the Swift Concurrency case, the Task Group manages the details of sequentially delivering the results in the for await
loop.