Search code examples
swiftcombineflatmap

How do you artificially complete a stream?


I have a stream of integers that I need to process using a structure that uses a CurrentValueSubject to know whether or not it is allowed to do the processing. Since CurrentValueSubject doesn't complete, if I do the processing inside a flatMap(), the overall stream (fooTheBars) only gets to complete if the stream of integers is empty.

If I comment out the processing part (.flatMap { bar in fooer.doFoo(with: bar) }) the stream completes normally because the CurrentValueSubject is not involved. But I need it to complete normally after all the items are processed regardless of the number of items.

Output I see:

[0, 1, 2]
doing foo with 0
doing foo with 1
doing foo with 2
received value
received value
received value

What I want it to output:

[0, 1, 2]
doing foo with 0
doing foo with 1
doing foo with 2
received value
received value
received value
received completion // hooray, completion!

Output when the stream is empty (completes normally which is what I want):

[]
received completion // hooray, completion!

If I comment out the processing (.flatMap()), everything is fine:

[0, 1, 2]
received value
received value
received value
received completion // hooray, completion!

How do I modify the code below the comment so that the overall stream completes when all the items finish processing? Is using CurrentValueSubject like that an inherently bad pattern?

struct Fooer {
    private let readyToFoo = CurrentValueSubject<Bool, Never>(true)

    func doFoo(with item: Int) -> AnyPublisher<Void, Never> {
        readyToFoo
            .filter { $0 }
            .map { _ in () }
            .handleEvents(receiveOutput: {
                print("doing foo with \(item)") // processing
            })
            .delay(for: .seconds(1), scheduler: DispatchQueue.main)
            .eraseToAnyPublisher()
    }
}

func getBars() -> AnyPublisher<Int, Never> {
    let bars = (0..<(Bool.random() ? 0 : 3))
    print(Array(bars))
    return bars.publisher
        .eraseToAnyPublisher()
}

// can't change anything above this comment

let fooer = Fooer()
let fooTheBars = getBars()
    .flatMap { bar in
        fooer.doFoo(with: bar)
    }
    .sink(receiveCompletion: { completion in
        print("received completion")
    }, receiveValue: { _ in
        print("received value")
    })

Solution

  • If I understand correctly, you only want doFoo to publish one value. i.e. the "processing" of a bar is complete as soon as doFoo publishes one value.

    Therefore, you can stop the doFoo publisher by just taking the first().

    .flatMap { bar in
        fooer.doFoo(with: bar).first()
    }
    

    This produces your intended output. And if readyToFoo happens to hold false when you call doFoo, doFoo won't publish. It is only when readToFoo sends a true, does doFoo publishes its first value.