Search code examples
swiftcombine

How do I cancel a combine subscription within a sink?


I have a somewhat complicated architecture for a feature in my app.

Sample code is below. My original expectation was that this would only print once, because I call cancellableSet.removeAll(). But this actually ends up being called twice, which creates problems in my application.

How do I get this so it only fires what's in the sink after the subscription is stored in the cancellable set.

Note that I have a few restrictions here that I'll mention. My sample code is just simplifying this.

  • Can't use a take or drop operation, as this may get called an undetermined amount of times.
import Combine

enum State {
    case loggedOut
    case doingSomething
}

let aState = CurrentValueSubject<State, Never>(.doingSomething)
private var cancellableSet: Set<AnyCancellable> = []

func logUserOut() {
    cancellableSet.removeAll()
    aState.send(.loggedOut)
}

func doSomethingElse() { }
aState.sink { newState in
    print("numberOfSubscriptions is: \(cancellableSet.count)")
    switch newState {
    case .loggedOut:
        doSomethingElse()
    case .doingSomething:
        logUserOut()
    }
    
}
.store(in: &cancellableSet)

Solution

  • The problem in your code is that the subscription starts delivering values synchronously before the call to sink returns, and so before the call to store even begins.

    One way to solve this is to turn aState into a ConnectablePublisher before subscribing. A ConnectablePublisher doesn't publish until its connect method is called. So call connect after store returns.

    You can use the makeConnectable method on any Publisher whose Failure == Never to wrap it in a ConnectablePublisher.

    let connectable = aState.makeConnectable()
    connectable.sink { newState in
        print("numberOfSubscriptions is: \(cancellableSet.count)")
        switch newState {
        case .loggedOut:
            doSomethingElse()
        case .doingSomething:
            logUserOut()
        }
    }
    .store(in: &cancellableSet)
    connectable.connect()