I have a stream of integers that I need to process using a structure that uses a CurrentValueSubject
to know whether or not it is allowed to do the processing. Since CurrentValueSubject
doesn't complete, if I do the processing inside a flatMap()
, the overall stream (fooTheBars
) only gets to complete if the stream of integers is empty.
If I comment out the processing part (.flatMap { bar in fooer.doFoo(with: bar) }
) the stream completes normally because the CurrentValueSubject
is not involved. But I need it to complete normally after all the items are processed regardless of the number of items.
Output I see:
[0, 1, 2]
doing foo with 0
doing foo with 1
doing foo with 2
received value
received value
received value
What I want it to output:
[0, 1, 2]
doing foo with 0
doing foo with 1
doing foo with 2
received value
received value
received value
received completion // hooray, completion!
Output when the stream is empty (completes normally which is what I want):
[]
received completion // hooray, completion!
If I comment out the processing (.flatMap()
), everything is fine:
[0, 1, 2]
received value
received value
received value
received completion // hooray, completion!
How do I modify the code below the comment so that the overall stream completes when all the items finish processing? Is using CurrentValueSubject
like that an inherently bad pattern?
struct Fooer {
private let readyToFoo = CurrentValueSubject<Bool, Never>(true)
func doFoo(with item: Int) -> AnyPublisher<Void, Never> {
readyToFoo
.filter { $0 }
.map { _ in () }
.handleEvents(receiveOutput: {
print("doing foo with \(item)") // processing
})
.delay(for: .seconds(1), scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
}
func getBars() -> AnyPublisher<Int, Never> {
let bars = (0..<(Bool.random() ? 0 : 3))
print(Array(bars))
return bars.publisher
.eraseToAnyPublisher()
}
// can't change anything above this comment
let fooer = Fooer()
let fooTheBars = getBars()
.flatMap { bar in
fooer.doFoo(with: bar)
}
.sink(receiveCompletion: { completion in
print("received completion")
}, receiveValue: { _ in
print("received value")
})
If I understand correctly, you only want doFoo
to publish one value. i.e. the "processing" of a bar
is complete as soon as doFoo
publishes one value.
Therefore, you can stop the doFoo
publisher by just taking the first()
.
.flatMap { bar in
fooer.doFoo(with: bar).first()
}
This produces your intended output. And if readyToFoo
happens to hold false
when you call doFoo
, doFoo
won't publish. It is only when readToFoo
sends a true
, does doFoo
publishes its first value.