I have a situation where my code needs to make one network call to fetch a bunch of items, but while waiting for those to come down, another network call might fetch an update to those items. I'd love to be able to enqueue those secondary results until the first one has finished. Is there a way to accomplish that with Combine?
Importantly, I am not able to wait before making the second request. It’s actually a connection to a websocket that gets made at the same time as the first request, and the updates come over the websocket outside of my control.
After examining Matt’s thorough book on Combine, I settled on .prepend()
. But as Matt warned me in the comments, .prepend()
doesn’t even subscribe to the other publisher until after the first one completes. This means I miss any signals sent prior to that. What I need is a Subject
that enqueues values, but perhaps that’s not so hard to make. Anyway, this is where I got:
Initially I was going to use .append()
, but I realized with .prepend()
I could avoid keeping a reference to one of the publishers. So here’s a simplified version of what I’ve got. There might be syntax errors in this, as I’ve whittled it down from my (employer’s) code.
There’s the ItemFeed
, which handles fetching a list of items and simultaneously handling item update events. The latter can arrive before the initial list of items, and thus must be sequenced via Combine to arrive after it. I attempt to do this by prepending the initial items source to the update PassthroughSubject
.
Below that is an XCTestCase
that simulates a lengthy initial item load, and adds an update before that load can complete. It attempts to subscribe to changes to the list of items, and tries to test that the first update is the initial 63 items, and the subsequent update is for 64 items (in this case, “update” results in adding an item).
Unfortunately, while the initial list is published, the update never arrives. I also tried removing the .output(at:)
operators, but the two sinks are only called once.
After the test case sets up the delayed “fetch,” and subscribes to changes in feed.items
, it calls feed.handleItemUpatedEvent
. This calls ItemFeed.updateItems.send(_:)
, but unfortunately that is lost to oblivion.
class
ItemFeed
{
typealias InitialItemsSource = Deferred<Future<[[String : Any]], Error>>
let updateItems = PassthroughSubject<[Item], Error>()
var funnel : AnyCancellable?
@Published var items = [Item]()
init(initialItemSource inSource: InitialItemsSource)
{
// Passthrough subject each time items are updated…
var pub = self.updateItems.eraseToAnyPublisher()
// Prepend the initial items we need to fetch…
let initialItems = source.tryMap { try $0.map { try Item(object: $0) } }
pub = pub.prepend(initialItems).eraseToAnyPublisher()
// Sink on the funnel to add or update to self.items…
self.funnel =
pub.sink { inCompletion in
// Handle errors
}
receiveValue: {
self.update(items: inItems)
}
}
func handleItemUpdatedEvent(_ inItem: Item) {
self.updateItems.send([inItem])
}
func update(items inItems: [Item]) {
// Update or add inItems to self.items
}
}
class
ItemFeedTests : XCTestCase
{
func
testShouldUpdateItems()
throws
{
// Set up a mock source of items…
let source = fetchItems(named: "items", delay: 3.0) // 63 items
let expectation = XCTestExpectation(description: "testShouldUpdateItems")
expectation.expectedFulfillmentCount = 2
let feed = ItemFeed(initialItemSource: source)
let sub1 = feed.$items
.output(at: 0)
.receive(on: DispatchQueue.main)
.sink { inItems in
expectation.fulfill()
debugLog("Got first items: \(inItems.count)")
XCTAssertEqual(inItems.count, 63)
}
let sub2 = feed.$items
.output(at: 1)
.receive(on: DispatchQueue.main)
.sink { inItems in
expectation.fulfill()
debugLog("Got second items: \(inItems.count)")
XCTAssertEqual(inItems.count, 64)
}
// Send an update right away…
let item = try loadItem(named: "Item3")
feed.handleItemUpdatedEvent(item)
XCTAssertEqual(feed.items.count, 0) // Should be no items yet
// Wait for stuff to complete…
wait(for: [expectation], timeout: 10.0)
sub1.cancel() // Not necessary, but silence the compiler warning
sub2.cancel()
}
}
After a fair bit of trial and error, I found a solution. I created a custom Publisher and Subscription that immediately subscribes to its upstream publisher and begins enqueuing elements (up to some specifiable capacity). It then waits for a subscriber to come along, and provides that subscriber with all the values up until now, and then continues providing values. Here’s a marble diagram:
I then use this in conjunction with .prepend()
like so:
extension
Publisher
{
func
enqueue<P>(gatedBy inGate: P, capacity inCapacity: Int = .max)
-> AnyPublisher<Self.Output, Self.Failure>
where
P : Publisher,
P.Output == Output,
P.Failure == Failure
{
let qp = Publishers.Queueing(upstream: self, capacity: inCapacity)
let r = qp.prepend(inGate).eraseToAnyPublisher()
return r
}
}
And this is how you use it…
func
testShouldReturnAllItemsInOrder()
{
let gate = PassthroughSubject<Int, Never>()
let stream = PassthroughSubject<Int, Never>()
var results = [Int]()
let sub = stream.enqueue(gatedBy: gate)
.sink
{ inElement in
debugLog("element: \(inElement)")
results.append(inElement)
}
stream.send(3)
stream.send(4)
stream.send(5)
XCTAssertEqual(results.count, 0)
gate.send(1)
gate.send(2)
gate.send(completion: .finished)
XCTAssertEqual(results.count, 5)
XCTAssertEqual(results, [1,2,3,4,5])
sub.cancel()
}
This prints what you would expect:
element: 1
element: 2
element: 3
element: 4
element: 5
It works well because creating the .enqueue(gatedBy:)
operator creates the queuing publisher qp
, which immediately subscribes to stream
and enqueues any values it sends. It then calls .prepend()
on qp
, which first subscribes to gate
, and waits for it to complete. When it finishes, it then subscribes to qp
, which immediately provides it with all the enqueued values, and then continues to provide it with values from the upstream publisher.
Here’s the code I finally ended up with.
//
// QueuingPublisher.swift
// Latency: Zero, LLC
//
// Created by Rick Mann on 2021-06-03.
//
import Combine
import Foundation
extension
Publishers
{
final
class
Queueing<Upstream: Publisher>: Publisher
{
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
private let upstream : Upstream
private let capacity : Int
private var queue : [Output] = [Output]()
private var subscription : QueueingSubscription<Queueing<Upstream>, Upstream>?
fileprivate var completion : Subscribers.Completion<Failure>? = nil
init(upstream inUpstream: Upstream, capacity inCapacity: Int)
{
self.upstream = inUpstream
self.capacity = inCapacity
// Subscribe to the upstream right away so we can start
// enqueueing values…
let sink = AnySubscriber { $0.request(.unlimited) }
receiveValue:
{ [weak self] (inValue: Output) -> Subscribers.Demand in
self?.relay(inValue)
return .none
}
receiveCompletion:
{ [weak self] (inCompletion: Subscribers.Completion<Failure>) in
self?.completion = inCompletion
self?.subscription?.complete(with: inCompletion)
}
inUpstream.subscribe(sink)
}
func
receive<S: Subscriber>(subscriber inSubscriber: S)
where
Failure == S.Failure,
Output == S.Input
{
let subscription = QueueingSubscription(publisher: self, subscriber: inSubscriber)
self.subscription = subscription
inSubscriber.receive(subscription: subscription)
}
/**
Return up to inDemand values.
*/
func
request(_ inDemand: Subscribers.Demand)
-> [Output]
{
let count = inDemand.min(self.queue.count)
let elements = Array(self.queue[..<count])
self.queue.removeFirst(count)
return elements
}
private
func
relay(_ inValue: Output)
{
// TODO: The Wenderlich example code checks to see if the upstream has completed,
// but I feel like want to send all the values we've gotten first?
// Save the new value…
self.queue.append(inValue)
// Discard the oldest if we’re over capacity…
if self.queue.count > self.capacity
{
self.queue.removeFirst()
}
// Send the buffer to our subscriber…
self.subscription?.dataAvailable()
}
final
class
QueueingSubscription<QP, Upstream> : Subscription
where
QP : Queueing<Upstream>
{
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
let publisher : QP
var subscriber : AnySubscriber<Output,Failure>? = nil
private var demand : Subscribers.Demand = .none
init<S>(publisher inP: QP,
subscriber inS: S)
where
S: Subscriber,
Failure == S.Failure,
Output == S.Input
{
self.publisher = inP
self.subscriber = AnySubscriber(inS)
}
func
request(_ inDemand: Subscribers.Demand)
{
self.demand += inDemand
emitAsNeeded()
}
func
cancel()
{
complete(with: .finished)
}
/**
Called by our publisher to let us know new
data has arrived.
*/
func
dataAvailable()
{
emitAsNeeded()
}
private
func
emitAsNeeded()
{
guard let subscriber = self.subscriber else { return }
let newValues = self.publisher.request(self.demand)
self.demand -= newValues.count
newValues.forEach
{
let nextDemand = subscriber.receive($0)
self.demand += nextDemand
}
if let completion = self.publisher.completion
{
complete(with: completion)
}
}
fileprivate
func
complete(with inCompletion: Subscribers.Completion<Failure>)
{
guard let subscriber = self.subscriber else { return }
self.subscriber = nil
subscriber.receive(completion: inCompletion)
}
}
}
} // extension Publishers
extension
Publisher
{
func
enqueue<P>(gatedBy inGate: P, capacity inCapacity: Int = .max)
-> AnyPublisher<Self.Output, Self.Failure>
where
P : Publisher,
P.Output == Output,
P.Failure == Failure
{
let qp = Publishers.Queueing(upstream: self, capacity: inCapacity)
let r = qp.prepend(inGate).eraseToAnyPublisher()
return r
}
}
extension
Subscribers.Demand
{
func
min(_ inValue: Int)
-> Int
{
if self == .unlimited
{
return inValue
}
return Swift.min(self.max!, inValue)
}
}