Search code examples
swiftcombine

How to have a publisher emit only to the last subscriber in Combine


Is there a way to have the publisher emit a value only to the latest subscriber/observer?

An example for that would be; a manager class that can be subscribed to by multiple observers. When an event occurs, I would like only the latest subscriber to be observed. As far as I know, there is no way for the publisher to keep track of its subscribers but my knowledge regarding Combine and reactive programming is limited so I am unsure if this is possible in the first place.


Solution

  • You are right. Unfortunately, there is no way to list/track subscribers of a publisher. To solve your problem, you have to implement a custom publisher. There are two possibilities here. Either you implement a custom publisher with the Publisher protocol, but Apple advises against this (see here), or you create a custom publisher with already existing types, as Apple recommends. I have prepared an example for the second option.

    The logic is very simple. We create a publisher with a PassthroughSubject inside (it can also be a CurrentValueSubject). Then we implement the methods typical of a PassthroughSubject and use them to overwrite the same methods of the PassthroughSubject, which is inside our class. In the sink method we store all returning subscriptions BUT before we add a new subscription to the Set, we go through all the already cached subscriptions and cancel them. This way we achieve the goal that only the last subscription works.

    
    // The subscriptions will be cached in the publisher.
    // To avoid strong references, I use the WeakBox recommendation from the Swift forum.
    struct WeakBox<T: AnyObject & Hashable>: Hashable {
        weak var item: T?
    
        func hash(into hasher: inout Hasher) {
            hasher.combine(item)
        }
    }
    
    class MyPublisher<T, E: Error> {
        private let subject = PassthroughSubject<T, E>()
        private var subscriptions = Set<WeakBox<AnyCancellable>>()
        
        deinit {
            subscriptions.removeAll()
        }
        
        public func send(_ input: T) {
            subject.send(input)
        }
        
        public func send(completion: Subscribers.Completion<E>) {
            subject.send(completion: completion)
        }
        
        public func sink(receiveCompletion receivedCompletion: @escaping (Subscribers.Completion<E>) -> Void, receiveValue receivedValue: @escaping (T) -> Void) -> AnyCancellable {
            let subscription = subject
                .sink(receiveCompletion: { completion in
                    receivedCompletion(completion)
                }, receiveValue: { value in
                    receivedValue(value)
                })
            
            // Cancel previous subscriptions.
            subscriptions.forEach { $0.item?.cancel() }
            
            // Add new subscription.
            subscriptions.insert(WeakBox(item: subscription))
            
            return subscription
        }
    }
    

    I tested the class in Playground as follows.

    let publisher = MyPublisher<Int, Never>()
    
    let firstSubscription = publisher
        .sink(receiveCompletion: { completion in
            print("1st subscription completion \(completion)")
        }, receiveValue: { value in
            print("1st subscription value \(value)")
        })
    
    let secondSubscription = publisher
        .sink(receiveCompletion: { completion in
            print("2st subscription completion \(completion)")
        }, receiveValue: { value in
            print("2st subscription value \(value)")
        })
    
    let thirdSubscription = publisher
        .sink(receiveCompletion: { completion in
            print("3st subscription completion \(completion)")
        }, receiveValue: { value in
            print("3st subscription value \(value)")
        })
    
    publisher.send(123)
    
    

    Console output:

    3st subscription value 123
    

    If you comment out the line subscriptions.forEach { $0.cancel() }, then you get:

    3st subscription value 123
    1st subscription value 123
    2st subscription value 123
    
    

    Hopefully I could help you.