It seems like a combination of collect(every:on:skipEmpty:discardWhenCompleted:)
and collect(count:)
in ReactiveSwift.
The resulting signal would send an event every n seconds if the count of accumulated values doesn't reach max count during each time interval. But if in a specific time interval, the count of values has reached max count, it will send immediately.
For example, timeInterval = 2s, maxCount = 2
extension SignalProducer {
func collect(count: Int, every: DispatchTimeInterval, on scheduler: QueueScheduler) -> SignalProducer<[Value], Error> {
SignalProducer<[Value], Error> { (observer: Signal<[Value], Error>.Observer, lifetime: Lifetime) in
var collectedValues: [Value] = []
let disposable = CompositeDisposable()
disposable += self
.observe(on: scheduler)
.start(Signal<Value, Error>.Observer(value: { value in
collectedValues.append(value)
if collectedValues.count == count {
observer.send(value: collectedValues)
collectedValues.removeAll()
}
},
failed: { error in
observer.send(error: error)
},
completed: {
if collectedValues.count > 0 {
observer.send(value: collectedValues)
collectedValues.removeAll()
}
observer.sendCompleted()
},
interrupted: {
observer.sendInterrupted()
}))
disposable += SignalProducer<Date, Never>.timer(interval: every,
on: scheduler)
.observe(on: scheduler)
.startWithValues { _ in
observer.send(value: collectedValues)
collectedValues.removeAll()
}
lifetime.observeEnded {
disposable.dispose()
}
}
}
}
It collects an array of values until it reaches a certain count and then fires or fires every time interval.
Test:
var counter = 0
let valuesGenerator = SignalProducer.timer(interval: .milliseconds(550), on: QueueScheduler.main)
.filter { date in
Calendar.current.component(.second, from: date) % 3 == 0
}
.map { _ in
defer { counter += 1 }
return counter
}
.take(first: 20)
.promoteError(Error.self)
valuesGenerator
.collect(count: 3, every: .seconds(5), on: QueueScheduler.main)
.start(Signal<[Int], Error>.Observer(value: { values in
print("valuesGenerator, date: \(Date()), values: \(values)")
},
failed: { error in
print("valuesGenerator, date: \(Date()), error: \(error)")
},
completed: {
print("valuesGenerator, date: \(Date()), completed")
},
interrupted: {
print("valuesGenerator, date: \(Date()), interrupted")
}))
Output:
valuesGenerator, date: 2024-02-16 16:37:51 +0000, values: [0, 1, 2]
valuesGenerator, date: 2024-02-16 16:37:51 +0000, values: [3]
valuesGenerator, date: 2024-02-16 16:37:56 +0000, values: [4, 5]
valuesGenerator, date: 2024-02-16 16:38:00 +0000, values: [6, 7, 8]
valuesGenerator, date: 2024-02-16 16:38:01 +0000, values: [9]
valuesGenerator, date: 2024-02-16 16:38:06 +0000, values: [10, 11, 12]
valuesGenerator, date: 2024-02-16 16:38:06 +0000, values: []
valuesGenerator, date: 2024-02-16 16:38:11 +0000, values: [13, 14]
valuesGenerator, date: 2024-02-16 16:38:15 +0000, values: [15, 16, 17]
valuesGenerator, date: 2024-02-16 16:38:16 +0000, values: []
valuesGenerator, date: 2024-02-16 16:38:21 +0000, values: [18, 19]
valuesGenerator, date: 2024-02-16 16:38:21 +0000, completed