Search code examples
swiftswift3reactive-cocoareactive-swift

Signal: Collect values over time interval


This might be a trivial question but I'm unable to find a solution to this seemingly easy task. As I'm new to ReactiveSwift and reactive programming I might simply miss something obvious.

Basically what I want to do is something like this:

signal.collect(timeInterval: .seconds(5))

I want to collect all values over a specific period of time from a signal. The resulting signal would produce an event every x seconds which would contain an array of the collected events from the first signal.

What is the best approach to do this in ReactiveSwift?


Solution

  • There's no built-in operator in ReactiveSwift for this task. Instead, you can use following approach, writing an extension:

    import Foundation
    import ReactiveSwift
    import Result
    public extension Signal {
        public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler() ) -> Signal {
            let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe()
            onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) {
                pipe.1.sendCompleted()
            }
            return Signal { observer in
                return self.observe { event in
                    switch event {
                    case let .value(value):
                        observer.send(value: value)
                    case .completed:
                        observer.sendCompleted()
                    case let .failed(error):
                        observer.send(error: error)
                    case .interrupted:
                        observer.sendInterrupted()
                    }
                }
            }.take(until: pipe.0)
        }
        
        public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> {
            return self.completeAfter(after: until).collect()
        }
    }
    

    And then use signal.collectUntil(5) method.

    Another way is to use timer function from ReactiveSwift. Example (add to same extension, as above):

    public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> {
        var signal: Signal<(), NoError>? = nil
        timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in
            signal = innerSignal.map { _ in () }.take(first: 1)
        }
        return self.take(until: signal!).collect()
    }
    

    I, however, don't like this approach, because it's faking nature of SignalProducer type extracting inner signal.

    Signal type itself has also timeout function, however it will be difficult to use it since it's error-producing. Example of how to use it (still, add to the same extension):

    public func completeOnError() -> Signal<Value, Error> {
        return Signal { observer in
            return self.observe { event in
                switch(event) {
                case .value(let v): observer.send(value: v)
                case .failed(_): observer.sendCompleted()
                case .interrupted: observer.sendInterrupted()
                case .completed: observer.sendCompleted()
                }
            }
        }
    }
    
    public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> {
        return self
            .timeout(after: until,
                     raising: NSError() as! Error,
                     on: QueueScheduler())
            .completeOnError()
            .collect()
    }
    

    PS: by choosing any of 3 options, mind passing correct scheduler or parametrizing your solution with correct scheduler.