Search code examples
swiftmultithreadingconcurrencydispatch-queuedispatchsemaphore

How to avoid thread explosion when using DispatchSemaphores?


In order to limit access to a common operation to only a finite count of threads, we can use DispatchSemaphore from Foundation. For example:

func someFunction(operation: @escaping () -> Void) {
        DispatchQueue.global().async {
            self.semaphore.wait()
            
            operation()
            
            self.semaphore.signal()
        }
    }

let semaphore = DispatchSemaphore(value: 5)

someFunction(1) {
  print("Exexuting add operation")
}

someFunction(2) {
  print("Exexuting add operation")
}
.
.
.
someFunction(n) //64 times

The problem here is that semaphore allows 5 concurrent operations, but now other threads are also dispatched, and the dispatched threads have to wait until some of the previous 5 threads complete their task. This will result in wastage of threads's utilization. Also, a DispatchQueue has a limit of 64 threads, so the system will run out of threads which is also referred to as Thread Explosion.

How do we avoid thread explosion here and dispatch only the right amount of threads which will be executed and not blocked?


Solution

  • The thread-explosion problem stems from the fact that the code will wait inside the dispatch to the global concurrent queue. That means that the code has already grabbed a thread before it waited for the semaphore, which can quickly exhaust the limited worker thread pool.

    To avoid this problem with semaphores, you should wait before the dispatch to the concurrent queue. But, you probably do not want to wait on the current thread, as that will block the caller. So, I might create a “scheduler” (serial) queue to manage all of the jobs being added to some “processor” (concurrent) queue:

    let schedulerQueue = DispatchQueue(label: …)
    let processorQueue = DispatchQueue(label: …, attributes: .concurrent)
    let semaphore = DispatchSemaphore(value: 5)
    
    func start(block: @escaping () -> Void) {
        schedulerQueue.async {
            semaphore.wait()
            processorQueue.async {
                block()
                semaphore.signal()
            }
        } 
    }
    

    If the work being initiated by block is, itself, asynchronous (e.g., a network request), then you would just move the semaphore.signal() into the completion handler of that asynchronous work item.


    For the sake of completeness, we often would avoid using semaphores at all. There are several alternatives:

    1. One easy option is OperationQueue, which can control the degree of concurrency with its maxConcurrentOperationCount:

      let processorQueue = OperationQueue()
      processorQueue.name = …
      processorQueue.maxConcurrentOperationCount = 5
      
      func start(block: @escaping () -> Void) {
          processorQueue.addOperation {
              block()
          } 
      }
      

      Admittedly, if the work associated with this operation was, itself, asynchronous, then you have to implement a custom Operation subclass, which allows you to manage dependencies between asynchronous tasks (such as contemplated in Trying to Understand Asynchronous Operation Subclass). This can be daunting if you haven't done this before, but it is an elegant legacy technique of managing dependencies between operations that are, themselves, asynchronous. But if the work is synchronous, operation queues let you constrain parallelism with great ease.

    2. Another classic way to avoid thread explosion is concurrentPerform:

      DispatchQueue.global().async {
          DispatchQueue.concurrentPerform(iterations: 100) { index in
              …
          }
      }
      

      This is if (a) you are launching all the tasks at one point in time (i.e., not adding more later); and (b) you just want to constrain the degree of concurrency to the number of cores available on your CPU rather than to an arbitrary value, e.g. 5. This is very useful for massively parallel computation patterns, where you simply want to avoid overcommitting the CPU. See Long cycle blocks application for an example.

    3. Another way to tackle this is with Combine’s maxPublishers pattern as outlined in Combine framework serialize async operations.

    4. Yet another way is Swift concurrency. E.g., you might have an AsyncChannel to manage the queue of tasks as they come in, and a task group to constrain the degree of concurrency (as outlined in the latter part of Make tasks in Swift concurrency run serially).

    All of these have advantages over the semaphore pattern insofar as they offer more robust cancelation patterns.

    Regarding which of these is best depends upon several factors: Nowadays, Swift concurrency is often the “go to” solution. Or if this is a GCD codebase, operation queues and concurrentPerform are common legacy solutions.

    Unfortunately, the implementation details of these will vary a little upon the precise requirements, so it is hard to get much more specific than this. Notably, the details vary if (a) the individual jobs are synchronous or asynchronous; and (b) whether all the jobs will be added up front or whether you might be adding more later. Bottom line, to get more specific, we might need a few more details the nature of what is going on in these parallel tasks. But hopefully, the above broadly outlines a few of the alternatives for constrained parallelism.