Search code examples
swiftconcurrencygrand-central-dispatchswift-concurrency

Split up Task into multiple concurrent subtasks


I am trying to do some calculations on a large number of objects. The objects are saved in an array and the results of the operation should be saved in a new array. To speed up the processing, I‘m trying to break up the task into multiple subtasks which can run concurrently on different threads. The simplified example code below replaces the actual operation with two seconds of wait.

I have tried multiple ways of solving this issue, using both DispatchQueues and Tasks.

  1. Using DispatchQueue

    The basic setup I used is the following:

    import Foundation
    
    class Main {
        let originalData = ["a", "b", "c"]
        var calculatedData = Set<String>()
    
        func doCalculation() {
            //calculate length of array slices.
            let totalLength = originalData.count
            let sliceLength = Int(totalLength / 3)
    
            var start = 0
            var end = 0
    
            let myQueue = DispatchQueue(label: "Calculator", attributes: .concurrent)
    
            var allPartialResults = [Set<String>]()
    
            for i in 0..<3 {
                if i != 2 {
                    start = sliceLength * i
                    end = start + sliceLength - 1
                } else {
                    start = totalLength - sliceLength * (i - 1)
                    end = totalLength - 1
                }
    
                allPartialResults.append(Set<String>())
    
                myQueue.async {
                    allPartialResults[i] = self.doPartialCalculation(data: Array(self.originalData[start...end]))
                }
    
            }
    
            myQueue.sync(flags: .barrier) {
                for result in allPartialResults {
                    self.calculatedData.formUnion(result)
                }
            }
    
            //do further calculations with the data
        }
    
        func doPartialCalculation(data: [String]) -> Set<String> {
            print("began")
    
            sleep(2)
            let someResultSet: Set<String> = ["some result"]
    
            print("ended")
    
            return someResultSet
        }
    }
    

    As expected, the Console Log is the following (with all three "ended" appearing at once, two seconds after all three "began" appeared at once):

    began
    began
    began
    ended
    ended
    ended
    

    When measuring performance using os_signpost (and using real data and calculations), this approach reduces the time needed for the entire doCalculation() function to run from 40ms to around 14ms.

    Note that to avoid data races when appending the results to the final calculatedData Set, I created an array of partial Data sets of which every DispatchQueue only accesses one index (which is not a solution I like and the main reason why I am not satisfied with this approach). What I would have liked to do is to call DispatchQueue.main from within myQueue and add the new data to the calculatedData Set on the main thread, however calling DispatchQueue.main.sync causes a deadlock and using the async version leads to the barrier flag not working as intended.

  2. Using Tasks

    In a second attempt, I tried using Tasks to run code concurrently. As I understand it, there are two options for running code concurrently with Tasks. async let and withTaskGroup. For the purpose of retrieving a variable quantity of partial results form a variable amount of concurrent tasks, I figured using withTaskGroup was the best option for me.

    I modified the code to look like this:

    class Main {
        let originalData = ["a", "b", "c"]
        var calculatedData = Set<String>()
    
        func doCalculation() async {
            //calculate length of array slices.
            let totalLength = originalData.count
            let sliceLength = Int(totalLength / 3)
    
            var start = 0
            var end = 0
    
            await withTaskGroup(of: Set<String>.self) { group in
                for i in 0..<3 {
                    if i != 2 {
                        start = sliceLength * i
                        end = start + sliceLength - 1
                    } else {
                        start = totalLength - sliceLength * (i - 1)
                        end = totalLength - 1
                    }
    
                    group.addTask {
                        return await self.doPartialCalculation(data: Array(self.originalData[start...end]))
                    }
                }
    
                for await newSet in group {
                    calculatedData.formUnion(newSet)
                }
            }
    
            //do further calculations with the data
        }
    
        func doPartialCalculation(data: [String]) async -> Set<String> {
            print("began")
    
            try? await Task.sleep(nanoseconds: UInt64(1e9))
            let someResultSet: Set<String> = ["some result"]
    
            print("ended")
    
            return someResultSet
        }
    }
    

    However, the Console Log prints the following (with every "ended" coming 2 seconds after the preceding "before"):

    began
    ended
    began
    ended
    began
    ended
    

    Measuring performance using os_signpost revealed that the operation takes 40ms to complete. Therefore it is not running concurrently.

With that being said, what is the best course of action for this problem?

  • Using DispatchQueue, how do you call the Main Queue to avoid data races from within a queue, while at the same time preserving a barrier flag later on in the code?
  • Using Task, how do can you actually make them run concurrently?

EDIT

Running the code on a real device instead of the simulator and changing the sleep function inside the Task from sleep() to Task.sleep(), I was able to achieve concurrent behavior in that the Console prints the expected log. However, the operation time for the task remains upwards of 40-50ms and is highly variable, sometimes reaching 200ms or more. This problem remains after adding the .userInitiated property to the Task.

Why does it take so much longer to run the same operation concurrently using Task compared to using DispatchQueue? Am I missing something?


Solution

  • A few observations:

    1. One possible performance difference is that the simulator artificially constrains the “cooperative thread pool” used by async-await. See Maximum number of threads with async-await task groups. This is one cause of a lack of full concurrency (on the simulator).

    2. In the async-await test, another factor that can affect concurrency is an actor. If an actor is enforcing serial execution, then consider declaring doPartialCalculation as nonisolated, so that it allows concurrent execution. Failure to do so can prevent any concurrent execution (with your sleep scenario, for example).

      The fact that you saw a significant performance difference when you went from sleep to Task.sleep makes me wonder if might have done this within an actor. Actors are “reentrant” and Task.sleep suspends execution and lets the actor to switch to another task. So it allows concurrency for a series of async methods.

      But Task.sleep is not analogous to some computationally intensive task that will tie up the thread. But by declaring the function as nonisolated, that will achieve concurrent execution for computationally intensive processes. That can achieve performance results that are nearly equivalent to what you achieved with a GCD implementation.

    3. That having being said, you might still find that async-await is a tiny bit slower than pure GCD implementations. Then again, Swift concurrency offers more native protections and compile-time warnings to ensure thread-safety.

      E.g., here are 100 compute-heavy tasks in both GCD and async-await, performed twice for each:

      enter image description here

      So, you simply have to ask yourself whether the benefits of async-await warrant the modest performance impact or not.


    A few unrelated asides on the GCD implementation:

    1. It should be noted that your GCD example is not thread-safe and so the comparison of your two code snippets is not entirely fair. You should make the GCD implementation thread-safe. (Perhaps consider temporarily testing with TSAN. See “Detect Data Races Among Your App’s Threads” section of Diagnosing Memory, Thread, and Crash Issues Early.) You should perform doPartialCalculation in parallel, but you must synchronize the update of allPartialResults (or any shared resource). You can use GCD serial queue for this. Or since you seem to be so concerned about performance, perhaps a NSLock or os_unfair_lock (though care must be taken with the latter). See the GCD example at the end of this answer.

    2. If your dispatched blocks are taking ~50 msec, that simply might not be enough work to justify the overhead of concurrency. You may even find that a simple, serial, rendition is faster!

      Often, to maximize the amount of work done per thread, we would “stride” through our index (which is what you appear to be doing with your “slice” logic). But if, even after striding, the time per concurrent loop is still measured in milliseconds, then it may turn out that concurrency is unwarranted altogether. Some tasks are so trivial that they simply will not benefit from concurrent execution.

    3. In your GCD example, you are dispatching to a concurrent queue, which if you have too many iterations, can lead to “thread explosion”, exhausting a very limited worker thread pool. You are only doing three iterations, so that’s not a problem now, but if the number of iterations grows, you would want to abandon that pattern, and adopt concurrentPerform (as seen here). It’s a great way to make full use of the hardware capabilities while avoiding the exhausting of the worker thread pool.

    4. As an aside, I would be wary of using any of the sleep methods as a proxy for a time consuming task. You actually want to keep the CPU busy. I personally use an inefficient π calculation as my general proxy for “do something slow”. That is what I used above.

      func performHeavyTask(iteration: Int) {
          let id = OSSignpostID(log: poi)
          os_signpost(.begin, log: poi, name: #function, signpostID: id, "%d", iteration)
      
          let pi = calculatePi(iterations: 100_000_000)
      
          os_signpost(.end, log: poi, name: #function, signpostID: id, "%f", pi)
      }
      
      // calculate pi using Gregory-Leibniz series
      
      func calculatePi(iterations: Int) -> Double {
          var result = 0.0
          var sign = 1.0
          for i in 0 ..< iterations {
              result += sign / Double(i * 2 + 1)
              sign *= -1
          }
          return result * 4
      }
      

    E.g. here is a GCD example which

    • uses concurrentPerform;
    • performs calculation in parallel but synchronizes array updates;
    • performs update of model on main thread;
    • uses Sequence<String> rather than [String] to eliminate expensive array creation:
    func doCalculation() {
        DispatchQueue.global().async { [originalData] in                                // gives me the willies to see asynchronous routine accessing property, so I might capture it here in case it ever changes to mutable property; or, better, it should be parameter of `doCalculation`
            let totalLength = originalData.count
            let iterations = 3                                                          // avoid brittle pattern of repeating this number (of values based upon it) repeatedly
            let sliceLength = totalLength / iterations
            let queue = DispatchQueue(label: "Calculator")                              // serial queue for synchronization
            var allResults = Set<String>()
            
            DispatchQueue.concurrentPerform(iterations: iterations) { i in
                let start = i * sliceLength
                let end = min(start + sliceLength, totalLength)
                
                let result = self.doPartialCalculation(with: originalData[start..<end]) // do calculation in parallel
                
                queue.sync { allResults.formUnion(result) }                             // synchronize update
            }
            
            // personally, I would not update a property from this method,
            // but rather would use local var and supply the results in a completion
            // handler parameter, and let caller update model as it sees fit.
            //
            // But if you are going to do this, synchronize the update somehow,
            // e.g., do it on the main thread.
            
            DispatchQueue.main.async {                                                  // update on main thread
                self.calculatedData = allResults                                        // or `self.calculatedData.formUnion(allResults)`, if that's what you really mean            
            }
        }
    }
    
    // note, rather than taking `[String]`, which requires us to create a new
    // `Array` instance, let's change this to take `Sequence<String>` as 
    // input ... that way we can supply array slices directly
    
    func doPartialCalculation<S>(with data: S) -> Set<String> where S: Sequence, S.Element == String {
        print("began")
        
        sleep(2)
        let someResultSet: Set<String> = ["some result"]
        
        print("ended")
        
        return someResultSet
    }
    

    Or, alternatively, you could do the updates of the local var asynchronously and keep track of them with a DispatchGroup, performing the final update (or call to the completion handler) on the .main queue:

    func doCalculation() {
        DispatchQueue.global().async { [originalData] in                                // gives me the willies to see asynchronous routine accessing property, so I might capture it here in case it ever changes to mutable property; or, better, it should be parameter of `doCalculation`
            let totalLength = originalData.count
            let iterations = 3                                                          // avoid brittle pattern of repeating this number (of values based upon it) repeatedly
            let sliceLength = totalLength / iterations
            let queue = DispatchQueue(label: "Calculator")                              // serial queue for synchronization
            let group = DispatchGroup()
            var allResults = Set<String>()
    
            DispatchQueue.concurrentPerform(iterations: iterations) { i in
                let start = i * sliceLength
                let end = min(start + sliceLength, totalLength)
    
                let result = self.doPartialCalculation(with: originalData[start..<end]) // do calculation in parallel
    
                queue.async(group: group) { allResults.formUnion(result) }              // synchronize update
            }
    
            // personally, I would not update a property from this method,
            // but rather would use local var and supply the results in a completion
            // handler parameter, and let caller update model as it sees fit.
            //
            // But if you are going to do this, synchronize the update somehow,
            // e.g., do it on the main thread.
    
            group.notify(queue: .main) {
                self.calculatedData = allResults                                        // or `self.calculatedData.formUnion(allResults)`, if that's what you really mean
            }
        }
    }
    

    You can benchmark this and see whether the asynchronous update has any material impact. It probably will not in this case, but the proof is in the pudding.