Search code examples
swiftkotlinkotlin-coroutines

Swift's concurrent tasks with Mutex similar to Kotlin's Coroutines


So I have the following Kotlin's sample code to work with async stuff by using Coroutines and I want to achieve the same with Swift:

Kotlin:

private val mutex = Mutex()

private suspend fun startConcurrentJobs() = coroutineScope {
    launch {
        asyncJob()
    }
    launch {
        asyncJob()
    }
}

private suspend fun asyncJob() {
    subAsyncJob1() // this job can be called by different coroutines at the same time

    mutex.withLock {
        // this job is not allowed to be called by different coroutines at the same time
        subAsyncJob2()
    }
}

private suspend fun subAsyncJob1() {
    // TODO: Concurrent work
}

private suspend fun subAsyncJob2() {
    // TODO: Protected work (exclusive access)
}

I have come up with the following two implementations in Swift:

  1. by using Actor:
actor Mutex {
    func lockAsyncJob<T>(_ operation: () async -> T) async -> T {
        return await operation()
    }
}

let mutex = Mutex()

private func startConcurrentJobs() async {
    await withTaskGroup(of: Void.self) { taskGroup in
        taskGroup.addTask {
            await self.asyncJob()
        }
        taskGroup.addTask {
            await self.asyncJob()
        }
    }
}

private func asyncJob() async {
    await subAsyncJob1() // this job can be called by different tasks at the same time
    
    await mutex.lockAsyncJob {
        // this job is not allowed to be called by different tasks at the same time
        await subAsyncJob2()
    }
}

private func subAsyncJob1() async {
    // TODO: Concurrent work
}

private func subAsyncJob2() async {
    // TODO: Protected work (exclusive access)
}
  1. by using DispatchQueue:
let mutexQueue = DispatchQueue(label: "com.example.mutexQueue")

private func startConcurrentJobs() async {
    await withTaskGroup(of: Void.self) { taskGroup in
        taskGroup.addTask {
            await self.asyncJob()
        }
        taskGroup.addTask {
            await self.asyncJob()
        }
    }
}

private func asyncJob() async {
    await subAsyncJob1() // this job can be called by different tasks at the same time
    
    await withCheckedContinuation { continuation in
        mutexQueue.async {
            // This block is executed serially on the queue, ensuring exclusive access
            Task {
                await self.subAsyncJob2()
                continuation.resume()
            }
        }
    }
}

private func subAsyncJob1() async {
    // TODO: Concurrent work
}
    
private func subAsyncJob2() async {
    // TODO: Protected work (exclusive access)
}

So I guess both implementations should achieve the same results as with Kotlin's Coroutines, though it's a bit tricky to handle sync code in Swift comparing to Kotlin's Mutex.

There are also NSLock, DispatchSemaphore in Swift but it's not safe and cause deadlocks, it's design for threads.

I didn't work much with Swift, so I'm trying to find the correct approach.


Solution

  • In Swift you don't need a Mutex here. You just need an actor and a Task.

    actor A {
        // The actor removes the need for a Mutex here
        private var job2Task: Task<Void, Never>?
    
        // I'm not sure why this is private in your example. How do you run this?
        public func startConcurrentJobs() async {
            await withTaskGroup(of: Void.self) { taskGroup in
                taskGroup.addTask {
                    await self.asyncJob()
                }
                taskGroup.addTask {
                    await self.asyncJob()
                }
            }
        }
    
        // Here's all the magic (well, nothing magic, just actors)
        private func asyncJob() async {
    
            // this job can be called by different tasks at the same time
            await subAsyncJob1()
    
            // Wait for the current task (if any) to complete. This is extremely
            // flexible. You could check for `!= nil` and return here or throw an
            // error or cancel the previous task. But it's ok for other callers to
            // re-enter `asyncJob()` while we `await`. The loop is because another
            // waiter might get in front of us in line.
            // Note that this is an "unfair" lock. There's no promise that a waiter
            // will ever be served if this is highly contended. (See below.)
            while job2Task != nil {
                await job2Task?.value
            }
    
            // Just to prove the point. You don't need this.
            assert(job2Task == nil)
    
            if Task.isCancelled { return }
    
            // This is an actor. We can know that nothing has occurred between the
            // `await task.value` and now. And because the end of this Task will set
            // `task = nil`, we know that it is now nil.
            job2Task = Task {
                // When the task completes, no matter how, clear it
                defer { job2Task = nil }
                await subAsyncJob2()
            }
    
            // Again, no race conditions. This is an actor. We know nothing can
            // happen between assigning `task` and awaiting it here. You could even
            // skip the `await` if you don't need it to finish before returning.
            await job2Task?.value
        }
    
        private func subAsyncJob1() async {
            // TODO: Concurrent work
        }
    
        private func subAsyncJob2() async {
            // TODO: Protected work (exclusive access)
        }
    }
    

    You can take this one step further and run job1 and job2 in parallel if they don't depend on each other:

    private func asyncJob() async {
    
        await withTaskGroup(of: Void.self) { group in
    
            // Kick off Job1
            group.addTask { await self.subAsyncJob1() }
    
            // Wait for any previous Job2 to finish
            while job2Task != nil {
                await job2Task?.value
            }
    
            if Task.isCancelled { return }
    
            // Kick off Job2
            job2Task = Task {
                // When the task completes, no matter how, clear it
                defer { job2Task = nil }
                await subAsyncJob2()
            }
    
            // And wait for it in parallel with Job1
            group.addTask { await self.job2Task?.value }
    
            await group.waitForAll()
        }
    }
    

    In most cases you'll want to put the synchronization logic directly in asyncJob2 rather than in the caller like this, but it's very similar either way.


    As mentioned in the comments above, note that this while loop is an unfair lock. Jobs do not run in any particular order. You can make a fair lock that orders the requests, but it can make things like cancellation and error handling a little harder to reason about. It looks like this:

    // Make the property non-optional
    private var job2Task: Task<Void, Never> = Task {}
    
    private func asyncJob() async {
        await withTaskGroup(of: Void.self) { group in
            group.addTask { await self.subAsyncJob1() }
    
            // Each task awaits the former one. The `[...]` makes a local copy
            job2Task = Task { [job2Task] in
                await job2Task.value
    
                if Task.isCancelled { return }
    
                // When the task completes, no matter how, clear it
                await subAsyncJob2()
            }
    
            group.addTask { await self.job2Task.value }
    
            await group.waitForAll()
        }
    }