Search code examples
swiftcombine

Swift Combine: enqueue updates to one publisher behind another publisher


I have a situation where my code needs to make one network call to fetch a bunch of items, but while waiting for those to come down, another network call might fetch an update to those items. I'd love to be able to enqueue those secondary results until the first one has finished. Is there a way to accomplish that with Combine?

Importantly, I am not able to wait before making the second request. It’s actually a connection to a websocket that gets made at the same time as the first request, and the updates come over the websocket outside of my control.

Update

After examining Matt’s thorough book on Combine, I settled on .prepend(). But as Matt warned me in the comments, .prepend() doesn’t even subscribe to the other publisher until after the first one completes. This means I miss any signals sent prior to that. What I need is a Subject that enqueues values, but perhaps that’s not so hard to make. Anyway, this is where I got:

Initially I was going to use .append(), but I realized with .prepend() I could avoid keeping a reference to one of the publishers. So here’s a simplified version of what I’ve got. There might be syntax errors in this, as I’ve whittled it down from my (employer’s) code.

There’s the ItemFeed, which handles fetching a list of items and simultaneously handling item update events. The latter can arrive before the initial list of items, and thus must be sequenced via Combine to arrive after it. I attempt to do this by prepending the initial items source to the update PassthroughSubject.

Below that is an XCTestCase that simulates a lengthy initial item load, and adds an update before that load can complete. It attempts to subscribe to changes to the list of items, and tries to test that the first update is the initial 63 items, and the subsequent update is for 64 items (in this case, “update” results in adding an item).

Unfortunately, while the initial list is published, the update never arrives. I also tried removing the .output(at:) operators, but the two sinks are only called once.

After the test case sets up the delayed “fetch,” and subscribes to changes in feed.items, it calls feed.handleItemUpatedEvent. This calls ItemFeed.updateItems.send(_:), but unfortunately that is lost to oblivion.

class
ItemFeed
{
    typealias   InitialItemsSource      =   Deferred<Future<[[String : Any]], Error>>
    
                let updateItems         =   PassthroughSubject<[Item], Error>()
                var funnel              :   AnyCancellable?
    
    @Published  var items               =   [Item]()
    
    
    
    init(initialItemSource inSource: InitialItemsSource)
    {
        //  Passthrough subject each time items are updated…
        
        var pub = self.updateItems.eraseToAnyPublisher()
        
        //  Prepend the initial items we need to fetch…
        
        let initialItems = source.tryMap { try $0.map { try Item(object: $0) } }
        pub = pub.prepend(initialItems).eraseToAnyPublisher()
        
        //  Sink on the funnel to add or update to self.items…
        
        self.funnel =
            pub.sink { inCompletion in
                //  Handle errors
            }
            receiveValue: {
                self.update(items: inItems)
            }
    }
    
    func handleItemUpdatedEvent(_ inItem: Item) {
        self.updateItems.send([inItem])
    }
    
    func update(items inItems: [Item]) {
        //  Update or add inItems to self.items
    }
}

class
ItemFeedTests : XCTestCase
{
    func
    testShouldUpdateItems()
        throws
    {
        //  Set up a mock source of items…
        
        let source = fetchItems(named: "items", delay: 3.0)      //  63 items
        
        let expectation = XCTestExpectation(description: "testShouldUpdateItems")
        expectation.expectedFulfillmentCount = 2
        
        let feed = ItemFeed(initialItemSource: source)
        
        let sub1 = feed.$items
                    .output(at: 0)
                    .receive(on: DispatchQueue.main)
                    .sink { inItems in
                        expectation.fulfill()
                        
                        debugLog("Got first items: \(inItems.count)")
                        XCTAssertEqual(inItems.count, 63)
                    }
        
        let sub2 = feed.$items
                    .output(at: 1)
                    .receive(on: DispatchQueue.main)
                    .sink { inItems in
                        expectation.fulfill()

                        debugLog("Got second items: \(inItems.count)")
                        XCTAssertEqual(inItems.count, 64)
                    }
        
        //  Send an update right away…
        
        let item = try loadItem(named: "Item3")
        feed.handleItemUpdatedEvent(item)
        
        XCTAssertEqual(feed.items.count, 0)         //  Should be no items yet
        
        //  Wait for stuff to complete…
        
        wait(for: [expectation], timeout: 10.0)
        
        sub1.cancel()           //  Not necessary, but silence the compiler warning
        sub2.cancel()
    }
}   

Solution

  • After a fair bit of trial and error, I found a solution. I created a custom Publisher and Subscription that immediately subscribes to its upstream publisher and begins enqueuing elements (up to some specifiable capacity). It then waits for a subscriber to come along, and provides that subscriber with all the values up until now, and then continues providing values. Here’s a marble diagram:

    enter image description here

    I then use this in conjunction with .prepend() like so:

    extension
    Publisher
    {
        func
        enqueue<P>(gatedBy inGate: P, capacity inCapacity: Int = .max)
            -> AnyPublisher<Self.Output, Self.Failure>
            where
                P : Publisher,
                P.Output == Output,
                P.Failure == Failure
        {
            let qp = Publishers.Queueing(upstream: self, capacity: inCapacity)
            let r = qp.prepend(inGate).eraseToAnyPublisher()
            return r
        }
    }
    

    And this is how you use it…

    func
    testShouldReturnAllItemsInOrder()
    {
        let gate = PassthroughSubject<Int, Never>()
        let stream = PassthroughSubject<Int, Never>()
        
        var results = [Int]()
        
        let sub = stream.enqueue(gatedBy: gate)
                    .sink
                    { inElement in
                        debugLog("element: \(inElement)")
                        results.append(inElement)
                    }
        stream.send(3)
        stream.send(4)
        stream.send(5)
        
        XCTAssertEqual(results.count, 0)
        
        gate.send(1)
        gate.send(2)
        gate.send(completion: .finished)
        
        XCTAssertEqual(results.count, 5)
        XCTAssertEqual(results, [1,2,3,4,5])
        
        sub.cancel()
    }
    

    This prints what you would expect:

    element: 1
    element: 2
    element: 3
    element: 4
    element: 5
    

    It works well because creating the .enqueue(gatedBy:) operator creates the queuing publisher qp, which immediately subscribes to stream and enqueues any values it sends. It then calls .prepend() on qp, which first subscribes to gate, and waits for it to complete. When it finishes, it then subscribes to qp, which immediately provides it with all the enqueued values, and then continues to provide it with values from the upstream publisher.

    Here’s the code I finally ended up with.

    //
    //  QueuingPublisher.swift
    //  Latency: Zero, LLC
    //
    //  Created by Rick Mann on 2021-06-03.
    //
    
    import Combine
    import Foundation
    
    
    
    extension
    Publishers
    {
        final
        class
        Queueing<Upstream: Publisher>: Publisher
        {
            typealias Output = Upstream.Output
            typealias Failure = Upstream.Failure
            
            private     let upstream            :   Upstream
            private     let capacity            :   Int
            private     var queue               :   [Output]                                                =   [Output]()
            private     var subscription        :   QueueingSubscription<Queueing<Upstream>, Upstream>?
            fileprivate var completion          :   Subscribers.Completion<Failure>?                        =   nil
            
            init(upstream inUpstream: Upstream, capacity inCapacity: Int)
            {
                self.upstream = inUpstream
                self.capacity = inCapacity
                
                //  Subscribe to the upstream right away so we can start
                //  enqueueing values…
                
                let sink = AnySubscriber { $0.request(.unlimited) }
                            receiveValue:
                            { [weak self] (inValue: Output) -> Subscribers.Demand in
                                self?.relay(inValue)
                                return .none
                            }
                            receiveCompletion:
                            { [weak self] (inCompletion: Subscribers.Completion<Failure>) in
                                self?.completion = inCompletion
                                self?.subscription?.complete(with: inCompletion)
                            }
                inUpstream.subscribe(sink)
            }
            
            func
            receive<S: Subscriber>(subscriber inSubscriber: S)
                where
                    Failure == S.Failure,
                    Output == S.Input
            {
                let subscription = QueueingSubscription(publisher: self, subscriber: inSubscriber)
                self.subscription = subscription
                inSubscriber.receive(subscription: subscription)
            }
    
            /**
                Return up to inDemand values.
            */
            
            func
            request(_ inDemand: Subscribers.Demand)
                -> [Output]
            {
                let count = inDemand.min(self.queue.count)
                let elements = Array(self.queue[..<count])
                self.queue.removeFirst(count)
                return elements
            }
            
            private
            func
            relay(_ inValue: Output)
            {
                //  TODO: The Wenderlich example code checks to see if the upstream has completed,
                //          but I feel like want to send all the values we've gotten first?
                
                //  Save the new value…
                
                self.queue.append(inValue)
                
                //  Discard the oldest if we’re over capacity…
                
                if self.queue.count > self.capacity
                {
                    self.queue.removeFirst()
                }
                
                //  Send the buffer to our subscriber…
                
                self.subscription?.dataAvailable()
            }
            
    
            final
            class
            QueueingSubscription<QP, Upstream> : Subscription
                where
                    QP : Queueing<Upstream>
            {
                typealias Output = Upstream.Output
                typealias Failure = Upstream.Failure
                
                        let publisher           :   QP
                        var subscriber          :   AnySubscriber<Output,Failure>?              =   nil
                private var demand              :   Subscribers.Demand                          =   .none
                
                init<S>(publisher inP: QP,
                        subscriber inS: S)
                    where
                        S: Subscriber,
                        Failure == S.Failure,
                        Output == S.Input
                {
                    self.publisher = inP
                    self.subscriber = AnySubscriber(inS)
                }
                
                func
                request(_ inDemand: Subscribers.Demand)
                {
                    self.demand += inDemand
                    emitAsNeeded()
                }
                
                func
                cancel()
                {
                    complete(with: .finished)
                }
                
                /**
                    Called by our publisher to let us know new
                    data has arrived.
                */
                
                func
                dataAvailable()
                {
                    emitAsNeeded()
                }
                
                private
                func
                emitAsNeeded()
                {
                    guard let subscriber = self.subscriber else { return }
                    
                    let newValues = self.publisher.request(self.demand)
                    self.demand -= newValues.count
                    newValues.forEach
                    {
                        let nextDemand = subscriber.receive($0)
                        self.demand += nextDemand
                    }
                    
                    if let completion = self.publisher.completion
                    {
                        complete(with: completion)
                    }
                }
                
                fileprivate
                func
                complete(with inCompletion: Subscribers.Completion<Failure>)
                {
                    guard let subscriber = self.subscriber else { return }
                    self.subscriber = nil
                    
                    subscriber.receive(completion: inCompletion)
                }
            }
        }
    }   //  extension Publishers
    
    
    extension
    Publisher
    {
        func
        enqueue<P>(gatedBy inGate: P, capacity inCapacity: Int = .max)
            -> AnyPublisher<Self.Output, Self.Failure>
            where
                P : Publisher,
                P.Output == Output,
                P.Failure == Failure
        {
            let qp = Publishers.Queueing(upstream: self, capacity: inCapacity)
            let r = qp.prepend(inGate).eraseToAnyPublisher()
            return r
        }
    }
    
    extension
    Subscribers.Demand
    {
        func
        min(_ inValue: Int)
            -> Int
        {
            if self == .unlimited
            {
                return inValue
            }
            
            return Swift.min(self.max!, inValue)
        }
    }