This might be a trivial question but I'm unable to find a solution to this seemingly easy task. As I'm new to ReactiveSwift and reactive programming I might simply miss something obvious.
Basically what I want to do is something like this:
signal.collect(timeInterval: .seconds(5))
I want to collect all values over a specific period of time from a signal. The resulting signal would produce an event every x seconds which would contain an array of the collected events from the first signal.
What is the best approach to do this in ReactiveSwift?
There's no built-in operator in ReactiveSwift for this task. Instead, you can use following approach, writing an extension:
import Foundation
import ReactiveSwift
import Result
public extension Signal {
public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler() ) -> Signal {
let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe()
onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) {
return Signal { observer in
return self.observe { event in
switch event {
case let .value(value):
observer.send(value: value)
case .completed:
case let .failed(error):
observer.send(error: error)
case .interrupted:
}.take(until: pipe.0)
public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> {
return self.completeAfter(after: until).collect()
And then use signal.collectUntil(5)
Another way is to use timer
function from ReactiveSwift. Example (add to same extension, as above):
public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> {
var signal: Signal<(), NoError>? = nil
timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in
signal = { _ in () }.take(first: 1)
return self.take(until: signal!).collect()
I, however, don't like this approach, because it's faking nature of SignalProducer
type extracting inner signal.
type itself has also timeout
function, however it will be difficult to use it since it's error-producing. Example of how to use it (still, add to the same extension):
public func completeOnError() -> Signal<Value, Error> {
return Signal { observer in
return self.observe { event in
switch(event) {
case .value(let v): observer.send(value: v)
case .failed(_): observer.sendCompleted()
case .interrupted: observer.sendInterrupted()
case .completed: observer.sendCompleted()
public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> {
return self
.timeout(after: until,
raising: NSError() as! Error,
on: QueueScheduler())
PS: by choosing any of 3 options, mind passing correct scheduler or parametrizing your solution with correct scheduler.