DispatchGroup
and OperationQueue
have methods wait()
and waitUntilAllOperationsAreFinished()
which wait for all operations in respective queues to complete.
But even when I call cancelAllOperations
it just changes the flag isCancelled
in every running operation and stop the queue from executing new operations. But it still waits for the operations to complete. Therefore running the operations must be stopped from the inside. But it is possible only if operation is incremental or has an inner cycle of any kind. When it's just long external request (web request for example), there is no use of isCancelled
variable.
Is there any way of stopping the OperationQueue or DispatchGroup waiting for the operations to complete if one of the operations decides that all queue is now outdated?
The practical case is: mapping a request to a list of responders, and it is known that only one may answer. If it happens, queue should stop waiting for other operations to finish and unlock the thread.
Edit: DispatchGroup and OperationQueue usage is not obligatory, these are just tools I thought would fit.
OK, so I think I came up with something. Results are stable, I've just tested. The answer is just one semaphore :)
let semaphore = DispatchSemaphore(value: 0)
let group = DispatchGroup()
let queue = DispatchQueue(label: "map-reduce", qos: .userInitiated, attributes: .concurrent)
let stopAtFirst = true // false for all results to be appended into one array
let values: [U] = <some input values>
let mapper: (U) throws -> T? = <closure>
var result: [T?] = []
for value in values {
queue.async(group: group) {
do {
let res = try mapper(value)
// appending must always be thread-safe, otherwise you end up with race condition and unstable results
DispatchQueue.global().sync {
result.append(res)
}
if stopAtFirst && res != nil {
semaphore.signal()
}
} catch let error {
print("Could not map value \"\(value)\" to mapper \(mapper): \(error)")
}
}
}
group.notify(queue: queue) { // this must be declared exactly after submitting all tasks, otherwise notification fires instantly
semaphore.signal()
}
if semaphore.wait(timeout: .init(secondsFromNow: 5)) == .timedOut {
print("MapReduce timed out on values \(values)")
}