Suppose I want to download 300 images in the background using Swift Concurrency. I'd like two things:
TaskGroup
has many advantages: great parallelism, very cheap child tasks, and cancellation. But it does not return until all 300 child tasks have finished.
AsyncStream
lets me return images as they're downloaded, but has no parallelism on its own—downloads happen one at a time, in sequence.
What I'd like to do is wrap a TaskGroup
with an AsyncStream
, like this:
let stream = AsyncStream(NSImage.self) { continuation in
_ = await withTaskGroup(of: NSImage.self, returning: [NSImage].self) { taskGroup in
let imageURLs: [URL] = ... // array of 300 URLs to download
for imageURL in imageURLs {
taskGroup.addTask { await downloadImage(url: imageURL) }
}
for await result in taskGroup {
continuation.yield(result)
}
continuation.finish()
return []
}
}
But AsyncStream
can't take an async
closure. So what's the best way to achieve this behavior with Swift Concurrency?
The idea is that you would bridge from the synchronous context of AsyncStream
to Swift concurrency by creating a Task
for the asynchronous work. Also remember to add an onTermination
closure so that it will respond to the cancelation of the AsyncStream
:
func images(for urls: [URL]) -> AsyncStream<NSImage> {
AsyncStream { continuation in
let task = Task {
await withTaskGroup(of: NSImage.self) { group in
for url in urls {
group.addTask { await self.downloadImage(url: url) }
}
for await image in group {
continuation.yield(image)
}
continuation.finish()
}
}
continuation.onTermination = { _ in
task.cancel()
}
}
}
Obviously, since you are doing all of these network requests concurrently, you must recognize that these will likely not finish in the order corresponding to the original array of URLs.
So, you might return a tuple of the original URL and the resulting image:
func images(for urls: [URL]) -> AsyncStream<(URL, NSImage)> {
AsyncStream { continuation in
let task = Task {
await withTaskGroup(of: (URL, NSImage).self) { group in
for url in urls {
group.addTask { await (url, self.downloadImage(url: url)) }
}
for await tuple in group {
continuation.yield(tuple)
}
continuation.finish()
}
}
continuation.onTermination = { _ in
task.cancel()
}
}
}
Or perhaps an index number and the image:
func images(for urls: [URL]) -> AsyncStream<(Int, NSImage)> {
AsyncStream { continuation in
let task = Task {
await withTaskGroup(of: (Int, NSImage).self) { group in
for (index, url) in urls.enumerated() {
group.addTask { await (index, self.downloadImage(url: url)) }
}
for await tuple in group {
continuation.yield(tuple)
}
continuation.finish()
}
}
continuation.onTermination = { _ in
task.cancel()
}
}
}