Search code examples
swiftcombineswift-concurrency

PassthroughSubject's AsyncPublisher values property not producing all values


I created a small wrapper around a PassthroughSubject which converts it into an AsyncStream.

protocol Channelable {}

final class Channel {
    private let subject = PassthroughSubject<Channelable, Never>()
    
    func send(_ value: Channelable) {
        subject.send(value)
    }
    
    func getValues() -> AsyncStream<Channelable> {
        AsyncStream { continuation in
            let task = Task {
                for await value in subject.values {
                    continuation.yield(value)
                }
            }
            
            continuation.onTermination = { _ in
                task.cancel()
            }
        }
    }
}

When I do the following:

enum Action: Channelable {
    case execute
}

func test() async {
        let channel = Channel()
        
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            channel.send(Action.execute)
            channel.send(Action.execute)
            channel.send(Action.execute)
        }
        
        for await value in channel.getValues() {
            print(value)
        }
    }

"execute" gets printed only once, and I would expect it to be printed three times.

What am I doing wrong?


Solution

  • I believe that you will find that if you insert delays between the three send calls, you will see all of the values in your for-await-in loop.

    The problem appears to be related to back-pressure. Consider the following, with Task.sleep introduced to consistently manifest the issue:

    class Channel<Channelable> {
        private let subject = PassthroughSubject<Channelable, Never>()
        
        func send(_ value: Channelable) {
            subject.send(value)
        }
        
        func values() -> AsyncStream<Channelable> {
            AsyncStream { continuation in
                let task = Task {
                    for await value in subject.values {
                        continuation.yield(value)
                        try? await Task.sleep(for: .seconds(1.5))
                    }
                }
                
                continuation.onTermination = { _ in
                    task.cancel()
                }
            }
        }
    }
    
    enum Action {
        case foo
        case bar
        case baz
    }
    
    func test() async {
        let channel = Channel<Action>()
        
        Task {
            try? await Task.sleep(for: .seconds(5))
            channel.send(.foo)
            try? await Task.sleep(for: .seconds(1))
            channel.send(.bar)
            try? await Task.sleep(for: .seconds(1))
            channel.send(.baz)
        }
        
        for await action in channel.values() {
            print(action)
        }
    }
    

    That will output:

    foo
    baz
    

    But if I eliminate that 1.5 second delay in the values function, so that it can yield the values more quickly than they are consumed, the back-pressure problem goes away:

    foo
    bar
    baz
    

    The documentation warns us:

    A PassthroughSubject drops values if there are no subscribers, or its current demand is zero.


    In terms of solutions, you have a few options:

    1. Interestingly, I found that the traditional sink approach did not suffer this problem. E.g., here is an example:

      class Channel<Channelable> {
          private let subject = PassthroughSubject<Channelable, Never>()
          private var cancellable: AnyCancellable?
      
          func send(_ value: Channelable) {
              subject.send(value)
          }
      
          func values() -> AsyncStream<Channelable> {
              AsyncStream { continuation in
                  cancellable = subject.sink { value in
                      continuation.yield(value)
                  }
      
                  continuation.onTermination = { [weak self] _ in
                      self?.cancellable = nil
                  }
              }
          }
      }
      

      There are refinements that I might make in this, but this is really for illustrative purposes, rather than a solution. As you’ll see below, there are probably better alternatives, anyway, so I will not belabor this one. Just know that the sink approach does not appear to manifest the problem in question. It does, though, block send until the prior sink finishes executing (e.g., add a delay in sink closure, and send seems to block the calling thread for that period of time.

    2. Apple has written a AsyncChannel (part of the Swift Async Algorithms package) with explicit attention placed on back-pressure, notably, with “affordance of back pressure applied from the consumption site to be transmitted to the production site.” Thus, the send will await the asynchronous value, but it won’t block the caller’s thread.

      func test() async {
          let channel = AsyncChannel<Action>()
      
          Task {
              try? await Task.sleep(for: .seconds(5))
              await channel.send(Action.foo)
              try? await Task.sleep(for: .seconds(1))
              await channel.send(Action.bar)
              try? await Task.sleep(for: .seconds(1))
              await channel.send(Action.baz)
          }
      
          for await action in channel {
              print(action)
          }
      }
      

      This is another alternative to writing your own Channel type.