This might be a trivial question but I'm unable to find a solution to this seemingly easy task. As I'm new to ReactiveSwift and reactive programming I might simply miss something obvious.
Basically what I want to do is something like this:
signal.collect(timeInterval: .seconds(5))
I want to collect all values over a specific period of time from a signal. The resulting signal would produce an event every x seconds which would contain an array of the collected events from the first signal.
What is the best approach to do this in ReactiveSwift?
There's no built-in operator in ReactiveSwift for this task. Instead, you can use following approach, writing an extension:
import Foundation
import ReactiveSwift
import Result
public extension Signal {
public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler() ) -> Signal {
let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe()
onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) {
pipe.1.sendCompleted()
}
return Signal { observer in
return self.observe { event in
switch event {
case let .value(value):
observer.send(value: value)
case .completed:
observer.sendCompleted()
case let .failed(error):
observer.send(error: error)
case .interrupted:
observer.sendInterrupted()
}
}
}.take(until: pipe.0)
}
public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> {
return self.completeAfter(after: until).collect()
}
}
And then use signal.collectUntil(5)
method.
Another way is to use timer
function from ReactiveSwift. Example (add to same extension, as above):
public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> {
var signal: Signal<(), NoError>? = nil
timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in
signal = innerSignal.map { _ in () }.take(first: 1)
}
return self.take(until: signal!).collect()
}
I, however, don't like this approach, because it's faking nature of SignalProducer
type extracting inner signal.
Signal
type itself has also timeout
function, however it will be difficult to use it since it's error-producing. Example of how to use it (still, add to the same extension):
public func completeOnError() -> Signal<Value, Error> {
return Signal { observer in
return self.observe { event in
switch(event) {
case .value(let v): observer.send(value: v)
case .failed(_): observer.sendCompleted()
case .interrupted: observer.sendInterrupted()
case .completed: observer.sendCompleted()
}
}
}
}
public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> {
return self
.timeout(after: until,
raising: NSError() as! Error,
on: QueueScheduler())
.completeOnError()
.collect()
}
PS: by choosing any of 3 options, mind passing correct scheduler or parametrizing your solution with correct scheduler.