I created a small wrapper around a PassthroughSubject which converts it into an AsyncStream.
protocol Channelable {}
final class Channel {
private let subject = PassthroughSubject<Channelable, Never>()
func send(_ value: Channelable) {
subject.send(value)
}
func getValues() -> AsyncStream<Channelable> {
AsyncStream { continuation in
let task = Task {
for await value in subject.values {
continuation.yield(value)
}
}
continuation.onTermination = { _ in
task.cancel()
}
}
}
}
When I do the following:
enum Action: Channelable {
case execute
}
func test() async {
let channel = Channel()
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
channel.send(Action.execute)
channel.send(Action.execute)
channel.send(Action.execute)
}
for await value in channel.getValues() {
print(value)
}
}
"execute" gets printed only once, and I would expect it to be printed three times.
What am I doing wrong?
I believe that you will find that if you insert delays between the three send
calls, you will see all of the values in your for
-await
-in
loop.
The problem appears to be related to back-pressure. Consider the following, with Task.sleep
introduced to consistently manifest the issue:
class Channel<Channelable> {
private let subject = PassthroughSubject<Channelable, Never>()
func send(_ value: Channelable) {
subject.send(value)
}
func values() -> AsyncStream<Channelable> {
AsyncStream { continuation in
let task = Task {
for await value in subject.values {
continuation.yield(value)
try? await Task.sleep(for: .seconds(1.5))
}
}
continuation.onTermination = { _ in
task.cancel()
}
}
}
}
enum Action {
case foo
case bar
case baz
}
func test() async {
let channel = Channel<Action>()
Task {
try? await Task.sleep(for: .seconds(5))
channel.send(.foo)
try? await Task.sleep(for: .seconds(1))
channel.send(.bar)
try? await Task.sleep(for: .seconds(1))
channel.send(.baz)
}
for await action in channel.values() {
print(action)
}
}
That will output:
foo
baz
But if I eliminate that 1.5 second delay in the values
function, so that it can yield
the values more quickly than they are consumed, the back-pressure problem goes away:
foo
bar
baz
The documentation warns us:
A
PassthroughSubject
drops values if there are no subscribers, or its current demand is zero.
In terms of solutions, you have a few options:
Interestingly, I found that the traditional sink
approach did not suffer this problem. E.g., here is an example:
class Channel<Channelable> {
private let subject = PassthroughSubject<Channelable, Never>()
private var cancellable: AnyCancellable?
func send(_ value: Channelable) {
subject.send(value)
}
func values() -> AsyncStream<Channelable> {
AsyncStream { continuation in
cancellable = subject.sink { value in
continuation.yield(value)
}
continuation.onTermination = { [weak self] _ in
self?.cancellable = nil
}
}
}
}
There are refinements that I might make in this, but this is really for illustrative purposes, rather than a solution. As you’ll see below, there are probably better alternatives, anyway, so I will not belabor this one. Just know that the sink
approach does not appear to manifest the problem in question. It does, though, block send
until the prior sink
finishes executing (e.g., add a delay in sink
closure, and send
seems to block the calling thread for that period of time.
Apple has written a AsyncChannel
(part of the Swift Async Algorithms package) with explicit attention placed on back-pressure, notably, with “affordance of back pressure applied from the consumption site to be transmitted to the production site.” Thus, the send
will await the asynchronous value, but it won’t block the caller’s thread.
func test() async {
let channel = AsyncChannel<Action>()
Task {
try? await Task.sleep(for: .seconds(5))
await channel.send(Action.foo)
try? await Task.sleep(for: .seconds(1))
await channel.send(Action.bar)
try? await Task.sleep(for: .seconds(1))
await channel.send(Action.baz)
}
for await action in channel {
print(action)
}
}
This is another alternative to writing your own Channel
type.