Search code examples
swiftconcurrencyxctest

Testing parallel execution with structured concurrency


I’m testing code that uses an actor, and I’d like to test that I’m properly handling concurrent access and reentrancy. One of my usual approaches would be to use DispatchQueue.concurrentPerform to fire off a bunch of requests from different threads, and ensure my values resolve as expected. However, since the actor uses structured concurrency, I’m not sure how to actually wait for the tasks to complete.

What I’d like to do is something like:

let iterationCount = 100
let allTasksComplete = expectation(description: "allTasksComplete")
allTasksComplete.expectedFulfillmentCount = iterationCount
DispatchQueue.concurrentPerform(iterations: iterationCount) { _ in
    Task {
        // Do some async work here, and assert
        allTasksComplete.fulfill()
    }
}
wait(for: [allTasksComplete], timeout: 1.0)

However the timeout for the allTasksComplete expectation expires every time, regardless of whether the iteration count is 1 or 100, and regardless of the length of the timeout. I’m assuming this has something to do with the fact that mixing structured and DispatchQueue-style concurrency is a no-no?

How can I properly test concurrent access — specifically how can I guarantee that the actor is accessed from different threads, and wait for the test to complete until all expectations are fulfilled?


Solution

  • A few observations:

    1. When testing Swift concurrency, we no longer need to rely upon expectations. We can just mark our tests as async methods. See Asynchronous Tests and Expectations. Here is an async test adapted from that example:

      func testDownloadWebDataWithConcurrency() async throws {
          let url = try XCTUnwrap(URL(string: "https://apple.com"), "Expected valid URL.")
      
          let (_, response) = try await URLSession.shared.data(from: url)
      
          let httpResponse = try XCTUnwrap(response as? HTTPURLResponse, "Expected an HTTPURLResponse.")
          XCTAssertEqual(httpResponse.statusCode, 200, "Expected a 200 OK response.")
      }
      
    2. FWIW, while we can now use async tests when testing Swift concurrency, we still can use expectations:

      func testWithExpectation() {
          let iterations = 100
          let experiment = ExperimentActor()
      
          let e = self.expectation(description: #function)
          e.expectedFulfillmentCount = iterations
      
          for i in 0 ..< iterations {
              Task.detached {
                  let result = await experiment.reentrantCalculation(i)
                  let success = await experiment.isAcceptable(result)
                  XCTAssert(success, "Incorrect value")
                  e.fulfill()
              }
          }
      
          wait(for: [e], timeout: 10)
      }
      
    3. You said:

      However the timeout for the allTasksComplete expectation expires every time, regardless of whether the iteration count is 1 or 100, and regardless of the length of the timeout.

      We cannot comment without seeing a reproducible example of the code replaced with the comment “Do some async work here, and assert”. We do not need to see your actual implementation, but rather construct the simplest possible example that manifests the behavior you describe. See How to create a Minimal, Reproducible Example.

      I personally suspect that you have some other, unrelated deadlock. E.g., given that concurrentPerform blocks the thread from which you call it, maybe you are doing something that requires the blocked thread. Also, be careful with Task { ... }, which runs the task on the current actor, so if you are doing something slow and synchronous inside there, that could cause problems. We might use detached tasks, instead.

      In short, we cannot diagnose the issue without a Minimal, Reproducible Example.

    4. As a more general observation, one should be wary about mixing GCD (or semaphores or long-lived locks or whatever) with Swift concurrency, because the latter uses a cooperative thread pool, which relies upon assumptions about its threads being able to make forward progress. But if you have GCD API blocking threads, those assumptions may no longer be valid. It may not the source of the problem here, but I mention it as a cautionary note.

    5. As an aside, concurrentPerform (which constrains the degree of parallelism) only makes sense if the work being executed runs synchronously. Using concurrentPerform to launch a series of asynchronous tasks will not constrain the concurrency at all. (The cooperative thread pool may, but concurrentPerform will not.)

      So, for example, if we wanted to test a bunch of calculations in parallel, rather than concurrentPerform, we might use a TaskGroup:

      func testWithStructuredConcurrency() async {
          let iterations = 100
          let experiment = ExperimentActor()
      
          await withTaskGroup(of: Void.self) { group in
              for i in 0 ..< iterations {
                  group.addTask {
                      let result = await experiment.reentrantCalculation(i)
                      let success = await experiment.isAcceptable(result)
                      XCTAssert(success, "Incorrect value")
                  }
              }
          }
      
          let count = await experiment.count
          XCTAssertEqual(count, iterations)
      }
      
    6. Now if you wanted to verify concurrent execution within an app, normally I would just profile the app (not unit tests) with Instruments, and either watch intervals in the “Points of Interest” tool or look at the new “Swift Tasks” tool described in WWDC 2022’s Visualize and optimize Swift concurrency video. E.g., here I have launched forty tasks and I can see that my device runs six at a time:

      enter image description here

      See Alternative to DTSendSignalFlag to identify key events in Instruments? for references about the “Points of Interest” tool.

    7. If you really wanted to write a unit test to confirm concurrency, you could theoretically keep track of your own counters, e.g.,

      final class MyAppTests: XCTestCase {
          func testWithStructuredConcurrency() async {
              let iterations = 100
              let experiment = ExperimentActor()
      
              await withTaskGroup(of: Void.self) { group in
                  for i in 0 ..< iterations {
                      group.addTask {
                          let result = await experiment.reentrantCalculation(i)
                          let success = await experiment.isAcceptable(result)
                          XCTAssert(success, "Incorrect value")
                      }
                  }
              }
      
              let count = await experiment.count
              XCTAssertEqual(count, iterations, "Correct count")
      
              let degreeOfConcurrency = await experiment.maxDegreeOfConcurrency
              XCTAssertGreaterThan(degreeOfConcurrency, 1, "No concurrency")
          }
      }
      

      Where:

      actor ExperimentActor {
          var degreeOfConcurrency = 0
          var maxDegreeOfConcurrency = 0
          var count = 0
      
          /// Calculate pi with Leibniz series
          ///
          /// Note: I am awaiting a detached task so that I can manifest actor reentrancy.
      
          func reentrantCalculation(_ index: Int, decimalPlaces: Int = 8) async -> Double {
              let task = Task.detached {
                  logger.log("starting \(index)")                   // I wouldn’t generally log in a unit test, but it’s a quick visual confirmation that I’m enjoying parallel execution
                  await self.increaseConcurrencyCount()
      
                  let threshold = pow(0.1, Double(decimalPlaces))
                  var isPositive = true
                  var denominator: Double = 1
                  var result: Double = 0
                  var increment: Double
      
                  repeat {
                      increment = 4 / denominator
                      if isPositive {
                          result += increment
                      } else {
                          result -= increment
                      }
                      isPositive.toggle()
                      denominator += 2
                  } while increment >= threshold
      
                  logger.log("finished \(index)")
                  await self.decreaseConcurrencyCount()
      
                  return result
              }
      
              count += 1
      
              return await task.value
          }
      
          func increaseConcurrencyCount() {
              degreeOfConcurrency += 1
              if degreeOfConcurrency > maxDegreeOfConcurrency { maxDegreeOfConcurrency = degreeOfConcurrency}
          }
      
          func decreaseConcurrencyCount() {
              degreeOfConcurrency -= 1
          }
      
          func isAcceptable(_ result: Double) -> Bool {
              return abs(.pi - result) < 0.0001
          }
      }
      
    8. Please note that if testing/running on simulator, the cooperative thread pool is somewhat constrained, not exhibiting the same degree of concurrency as you will see on an actual device.

    9. Also note that if you are testing whether a particular test is exhibiting parallel execution, you might want to disable the parallel execution of tests, themselves, so that other tests do not tie up your cores, preventing any given particular test from enjoying parallel execution.

      enter image description here