Search code examples
iosarraysswiftmultithreadingasync-await

Swift async Loop in Task


I trying to understand how exactly Task works with loop.

I was experimenting with outputting numbers to the console and noticed that I was getting a result that was not what I expected.

actor ThreadSafeCollection<T> {
    private var collection: [T] = []

    func add(_ element: T) {
        collection.append(element)
    }

    func getAll() -> [T] {
        return collection
    }

    func remove(at index: Int) {
        guard collection.indices.contains(index) else { return }
        collection.remove(at: index)
    }
}

var safeCollection: ThreadSafeCollection<Int> = ThreadSafeCollection()

@Sendable func firstIterate() async -> Task<[Int], Never> {
    Task {
        for i in 0..<500 {
            await safeCollection.add(i)
        }
        return await safeCollection.getAll()
    }
}

@Sendable func secondIterate() async -> Task<[Int], Never> {
    Task {
        for i in 0..<500 {
            await safeCollection.add(i)
        }
        return await safeCollection.getAll()
    }
}

Task {
    let result = await withTaskGroup(of: [Int].self, returning: [Int].self) { taskGroup in
        taskGroup.addTask { await firstIterate().value }
        taskGroup.addTask { await secondIterate().value }
        
        var collected = [Int]()
        
        for await value in taskGroup {
            collected.append(contentsOf: value)
        }

        return collected
    }
    print(result.sorted(by: <))
}


In this example, I iterate 2 times to 500 by calling the first Iterate() and secondIterate() methods, as a result of which I expect to get an array with numbers in which each number will be repeated 2 times. But instead, I see each number 4 times in the console. [0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4...

And besides, I noticed that at the end of the array, the numbers are repeated not 4 times as at the beginning, but 3.

...494, 494, 494, 495, 495, 495, 496, 496, 496, 497, 497, 497, 498, 498, 498, 499, 499, 499]

Can anyone explain why this is happening?


Solution

  • Paulw11 explained why you are seeing, in general, four values (because you are appending results to your ThreadSafeCollection, but each task returns the whole, then-current, collection (i.e., returning the whole, combined, collection twice).

    And Cy-4AH explained why you sometimes see three occurrences of 499, because you have a race between when one task appends values and when the other task returns the (duplicative) copy of the whole collection. (This problem would be more consistently manifested if one of the tasks was appreciably slower than the other one, e.g., you insert a delay in one of those two for loops.)

    There are two solutions:

    1. Given that you are collecting the results in ThreadSafeCollection, do not return the whole collection for each task group subtask. Just return the getAll of the ThreadSafeCollection at the end:

      final class ThreadSafeCollectionExample: Sendable {
          var safeCollection: ThreadSafeCollection<Int> = []
      
          func firstIterate() async {
              for i in 0..<500 {
                  await safeCollection.append(i)
              }
          }
      
          func secondIterate() async {
              for i in 0..<500 {
                  await safeCollection.append(i)
              }
          }
      }
      
      Task {
          let experiment = ThreadSafeCollectionExample()
      
          let result = await withTaskGroup(of: Void.self) { group in // or `withDiscardingTaskGroup` in more current OS versions
              group.addTask { await experiment.firstIterate() }
              group.addTask { await experiment.secondIterate() }
      
              await group.waitForAll()
      
              return await experiment.safeCollection.getAll()
          }
          print(result.sorted(by: <))
      }
      

      That will return the safeCollection with two copies of the values 0..<500, as expected. But this introduces a lot of individual synchronizations, which can impact performance. (See point 2, below, if you want to see how to avoid that.)

      Unrelated, but if you forgive me, I tweaked ThreadSafeCollection to use more conventional naming conventions (e.g., append vs add). I also added ExpressibleByArrayLiteral so I could use the more concise initialization syntax. Also, we should probably make sure that T conforms to Sendable:

      actor ThreadSafeCollection<T: Sendable>: ExpressibleByArrayLiteral {
          private var collection: [T]
      
          init(collection: [T] = []) {
              self.collection = collection
          }
      
          convenience init(arrayLiteral: T...) {
              self.init(collection: arrayLiteral)
          }
      
          func append(_ element: T) {
              collection.append(element)
          }
      
          func append(contentsOf elements: [T]) {
              collection.append(contentsOf: elements)
          }
      
          func getAll() -> [T] {
              return collection
          }
      
          func remove(at index: Int) {
              guard collection.indices.contains(index) else { return }
              collection.remove(at: index)
          }
      }
      
    2. Retire ThreadSafeCollection entirely, and have the routines return local variables:

      final class UsingLocalCollections: Sendable {
          func firstIterate() async -> [Int] {
              var values: [Int] = []
              for i in 0..<500 {
                  values.append(i)
              }
              return values
          }
      
          func secondIterate() async -> [Int] {
              var values: [Int] = []
              for i in 0..<500 {
                  values.append(i)
              }
              return values
          }
      }
      
      Task {
          let experiment = UsingLocalCollections()
      
          let result = await withTaskGroup(of: [Int].self) { group in // or `withDiscardingTaskGroup` in more current OS versions
              group.addTask { await experiment.firstIterate() }
              group.addTask { await experiment.secondIterate() }
      
              return await group.reduce(into: []) {
                  $0.append(contentsOf: $1)
              }
          }
          print(result.sorted(by: <))
      }
      

      This approach has the virtue of not needing to synchronize each of the 1,000 updates to the collections (which will defeat any performance benefits you hoped for by running these concurrently).


    As an aside, going back to your original example, I might recommend avoiding unnecessary unstructured concurrency, i.e., the use of Task {…}.

    So, rather than:

    func firstIterate() -> Task<[Int], Never> {
        Task {
            for i in 0..<500 {
                …
            }
            return await safeCollection.getAll()
        }
    }
    

    You can remain within structured concurrency and simplify this to:

    func firstIterate() async -> [Int] {
        for i in 0..<500 {
            …
        }
        return await safeCollection.getAll()
    }
    

    And then, instead of:

    taskGroup.addTask { await firstIterate().value }
    

    You would just:

    taskGroup.addTask { await firstIterate() }
    

    Not only is it cleaner, but by remaining in structured concurrency, you enjoy automatic propagation of task cancelation, etc. Unstructured concurrency gives you more fine-grained control, but encumbers you with the burden of the correctness of the code. So use unstructured concurrency where you really need it (which is not the case here), but favor structured concurrency where possible. It is simpler. (If you really need example of how to use unstructured concurrency properly, let me know.)