Search code examples
swiftswiftuiasyncstream

AsyncStream spams view, where AsyncPublisher does not


I'm running into a behavior with AsyncStream I don't quite understand.

When I have an actor with a published variable, I can "subscribe" to it via an AsyncPublisher and it behaves as expected, updating only when there is a change in value. If I create an AsyncStream with a synchronous context (but with a potential task retention problem) it also behaves as expected.

The weirdness happens when I try to wrap that publisher in an AsyncStream with an asyncronous context. It starts spamming the view with an update per loop it seems, NOT only when there is a change.

What am I missing about the AsyncStream.init(unfolding:oncancel:) which is causing this behavior?

https://developer.apple.com/documentation/swift/asyncstream/init(unfolding:oncancel:)?

import Foundation
import SwiftUI



actor TestService {
    static let shared = TestService()
    
    @MainActor @Published var counter:Int = 0
    
    @MainActor public func updateCounter(by delta:Int) async {
        counter = counter + delta
    }
    
    public func asyncStream() -> AsyncStream<Int> {
        return AsyncStream.init(unfolding: unfolding, onCancel: onCancel)
        
        //() async -> _?
        func unfolding() async -> Int? {
            for await n in $counter.values {
                //print("\(location)")
                return n
            }
            return nil
        }
        
        //optional
        @Sendable func onCancel() -> Void {
            print("confirm counter got canceled")
        }
    }
    
    public func syncStream() -> AsyncStream<Int> {
        AsyncStream { continuation in
            let streamTask = Task {
                for await n in $counter.values {
                    continuation.yield(n)
                }
            }

            continuation.onTermination = { @Sendable _ in
                streamTask.cancel()
                print("StreamTask Canceled")
            }

        }
    }
    
}

struct ContentView: View {
    var body: some View {
        VStack {
            TestActorButton()
            HStack {
                //TestActorViewA() //<-- uncomment at your own risk. 
                TestActorViewB()
                TestActorViewC()
            }
        }
        .padding()
    }
}


struct TestActorButton:View {
    var counter = TestService.shared
    
    
    var body: some View {
        Button("increment counter") {
            Task { await counter.updateCounter(by: 2) }
        }
    }
}


struct TestActorViewA:View {
    var counter = TestService.shared
    @State var counterVal:Int = 0
    
    var body: some View {
        Text("\(counterVal)")
            .task {
                //Fires constantly.
                for await value in await counter.asyncStream() {
                    print("View A Value: \(value)")
                    counterVal = value
                }
            }
    }
}

struct TestActorViewB:View {
    var counter = TestService.shared
    @State var counterVal:Int = 0
    
    var body: some View {
        Text("\(counterVal)")
            .task {
                //Behaves like one would expect. Fires once per change.
                for await value in await counter.$counter.values {
                    print("View B Value: \(value)")
                    counterVal = value
                }
            }
    }
}

struct TestActorViewC:View {
    var counter = TestService.shared
    @State var counterVal:Int = 0
    
    var body: some View {
        Text("\(counterVal)")
            .task {
                //Also only fires on update
                for await value in await counter.syncStream() {
                    print("View C Value: \(value)")
                    counterVal = value
                }
            }
    }
}


Solution

  • The real solution to wrapping a publisher appears to be to stick to the synchronous context initializer and have it cancel it's own task:

    public func stream() -> AsyncStream<Int> {
            AsyncStream { continuation in
                let streamTask = Task {
                    for await n in $counter.values {
                        //do hard work to transform n 
                        continuation.yield(n)
                    }
                }
    
                continuation.onTermination = { @Sendable _ in
                    streamTask.cancel()
                    print("StreamTask Canceled")
                }
    
            }
        }
    

    From what I can tell the "unfolding" style initializer for AsyncStream is simply not a fit for wrapping an AsyncPublisher. The "unfolding" function will "pull" at the published value from within the stream, so the stream will just keep pushing values from that infinite well.

    It seems like the "unfolding" style initializer is best used when processing a finite (but potentially very large) list of items, or when generating ones values from scratch... something like:

    struct NumberQueuer {
        let numbers:[Int]
        
        public func queueStream() -> AsyncStream<Int> {
            var iterator = AsyncArray(values: numbers).makeAsyncIterator()
            print("Queue called")
            return AsyncStream.init(unfolding: unfolding, onCancel: onCancel)
            
            //() async -> _?
            func unfolding() async -> Int? {
                do {
                    if let item = try await iterator.next() {
                        return item
                    }
                } catch let error {
                    print(error.localizedDescription)
                }
                return nil
                
            }
            
            //optional
            @Sendable func onCancel() -> Void {
                print("confirm NumberQueue got canceled")
            }
        }
        
    }
    
    public struct AsyncArray<Element>: AsyncSequence, AsyncIteratorProtocol {
        
        let values:[Element]
        let delay:TimeInterval
        
        var currentIndex = -1
        
        public init(values: [Element], delay:TimeInterval = 1) {
            self.values = values
            self.delay = delay
        }
        
        public mutating func next() async throws -> Element? {
            currentIndex += 1
            guard currentIndex < values.count else {
                return nil
            }
            try await Task.sleep(nanoseconds: UInt64(delay * 1E09))
            return values[currentIndex]
        }
        
        public func makeAsyncIterator() -> AsyncArray {
            self
        }
    }
    
    

    One can force the unfolding type to work with an @Published by creating a buffer array that is checked repeatedly. The variable wouldn't actually need to be @Published anymore. This approach has a lot of problems but it can be made to work. If interested, I put it in a repo with a bunch of other AsyncStream examples. https://github.com/carlynorama/StreamPublisherTests

    This article was very helpful to sorting this out: https://www.raywenderlich.com/34044359-asyncsequence-asyncstream-tutorial-for-ios

    As was this video: https://www.youtube.com/watch?v=UwwKJLrg_0U