I have a SignalProducer
producer
that asynchronously sends Int
s. I can sum the values with
producer.scan(0, +)
Suppose I want to reset the sum to 0
if it is > 10
and no other values have been sent for 1 second. My first attempt looked like this:
producer
.scan(0, +)
.flatMap(.latest) { n -> SignalProducer<Int, NoError> in
if n <= 10 {
return SignalProducer(value: n)
} else {
return SignalProducer.merge([
SignalProducer(value: n),
SignalProducer(value: 0).delay(1, on: QueueScheduler.main)
])
}
}
While this correctly sends 0
, it doesn't reset the state in scan
. That is, a sequence of 9, 8, long pause, 7
sends 9, 17, 0, 24
.
Is there a way to combine these two concept in a way that correctly resets the state?
I would use startWithSignal
to gain access to the produced Signal
in order to create a reset trigger to merge with the incoming values. I'm using an enum as my value to indicate whether the value coming into the scan
is a reset trigger or an Int
to accumulate.
enum ValOrReset {
case val(Int)
case reset
}
producer.startWithSignal { signal, _ in
resetTrigger = signal
.debounce(TimeInterval(1.0), on: QueueScheduler.main)
.map { _ in .reset }
signal
.map { val in .val(val) }
.merge(with: resetTrigger)
.scan(0) { (state, next) in
switch next {
case .val(let val):
return state + val
case .reset:
if state > 10 {
return 0
}
else {
return state
}
}
}
.observeValues { val in
print(val)
}
}
The way startWithSignal
works is that it ensures no values will come until the closure is finished, which means you can create multiple downstream signals and wire them together without worrying about missing any values, even if the producer sends values synchronously.