import Dispatch
class SynchronizedArray<T> {
private var array: [T] = []
private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)
var get: [T] {
accessQueue.sync {
array
}
}
func append(newElement: T) {
accessQueue.async(flags: .barrier) {
self.array.append(newElement)
}
}
}
If I run the following code, 10,000 elements are appended to the array as expected even if I am reading concurrently:
DispatchQueue.concurrentPerform(iterations: 10000) { i in
_ threadSafeArray.get
threadSafeArray.append(newElement: i)
}
But when I do this, only it never comes close to adding 10,000 elements (only added 92 elements on my computer the last time I ran it).
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
concurrent.async {
_ = threadSafeArray.get
threadSafeArray.append(newElement: i)
}
}
Why does the former work, and why doesn't the latter work?
It's good that you found a solution to the thread explosion. See a discussion on thread explosion WWDC 2015 Building Responsive and Efficient Apps with GCD and again in WWDC 2016 Concurrent Programming With GCD in Swift 3.
That having been said, DispatchSemaphore
is a bit of an anti-pattern, nowadays, given the presence of concurrentPerform
(or OperationQueue
with its maxConcurrentOperationCount
or Combine with its maxPublishers
). All of these manage degrees of concurrency more elegantly than dispatch semaphores.
All that having been said, a few observations on your semaphore pattern:
When using this DispatchSemaphore
pattern, you generally put the wait
before the concurrent.async { ... }
(because, as written, you're getting nine concurrent operations, not eight, which is a bit misleading).
The deeper problem here is that you've diminished the problem of the count issue, but it still persists. Consider:
let threadSafeArray = SynchronizedArray<Int>()
let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 8)
for i in 0..<10000 {
semaphore.wait()
concurrent.async {
threadSafeArray.append(newElement: i)
semaphore.signal()
}
}
print(threadSafeArray.get.count)
When you leave the for
loop, you can still have up to eight of the async tasks on concurrent
still running, and the count
(unsynchronized with respect to concurrent
queue) can still be less than 10,000. You have to add another concurrent.async(flags: .barrier) { ... }
, which is just adding a second layer of synchronization. E.g.
let semaphore = DispatchSemaphore(value: 8)
for i in 0..<10000 {
semaphore.wait()
concurrent.async {
threadSafeArray.append(newElement: i)
semaphore.signal()
}
}
concurrent.async(flags: .barrier) {
print(threadSafeArray.get.count)
}
Or you can use a DispatchGroup
, the classical mechanism for determining when a series of asynchronously dispatched blocks finish:
let semaphore = DispatchSemaphore(value: 8)
let group = DispatchGroup()
for i in 0..<10000 {
semaphore.wait()
concurrent.async(group: group) {
threadSafeArray.append(newElement: i)
semaphore.signal()
}
}
group.notify(queue: .main) {
print(threadSafeArray.get.count)
}
Using of concurrentPerform
eliminates the need for either of these patterns because it won’t continue execution until all of the concurrent tasks are done. (It will also automatically optimize the degree of concurrency for the number of cores on your device.)
FWIW, a much better alternative to to SynchronizedArray
is to not expose the underlying array at all, and just implement whatever methods you want to exposed, integrating the necessary synchronization. It makes for cleaner call site, and solves many issues.
For example, assuming you wanted to expose subscript operator and a count
variable, you would do:
class SynchronizedArray<T> {
private var array: [T]
private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent)
init(_ array: [T] = []) {
self.array = array
}
subscript(index: Int) -> T {
get { reader { $0[index] } }
set { writer { $0[index] = newValue } }
}
var count: Int {
reader { $0.count }
}
func append(newElement: T) {
writer { $0.append(newElement) }
}
func reader<U>(_ block: ([T]) throws -> U) rethrows -> U {
try accessQueue.sync { try block(array) }
}
func writer(_ block: @escaping (inout [T]) -> Void) {
accessQueue.async(flags: .barrier) { block(&self.array) }
}
}
This solves a variety of issues. For example, you can now do:
print(threadSafeArray.count) // get the count
print(threadSafeArray[500]) // get the 500th item
You also now can also do things like:
let average = threadSafeArray.reader { array -> Double in
let sum = array.reduce(0, +)
return Double(sum) / Double(array.count)
}
But, bottom line, when dealing with collections (or any mutable object), you invariably do not want to expose the mutable object, itself, but rather write your own synchronized methods for common operations (subscripts, count
, removeAll
, etc.), and possibly also expose the reader/writer interface for those cases where the app developer might need a broader synchronization mechanism.
(FWIW, the changes to this SynchronizedArray
apply both to the semaphore or concurrentPerform
scenarios; it is just that the semaphore just happens to manifest the problem in this case.)
Needless to say, you would generally have more work being done on each thread, too, because as modest as the context switching overhead, it is likely enough here to offset any advantages gained from parallel processing. (But I understand that this was likely just a conceptual demonstration of a problem, not a proposed implementation.) Just a FYI to future readers.