Search code examples
iosswiftnstimerrx-swift

Creating a "reactive" API with RxSwift


I'm dipping toes into RxSwift and would like to create a "streaming API" for one of my regular API calls.

My idea is to take the regular call (which already uses observables without any problems) and have a timer fire such calls and send the results on the same observable, so the view controller can update automatically, so instead of doing this (pseudocode follows):

func getLocations() -> Observable<[Location]> {
  return Observable<[Location]>.create {
    sink in
    NSURLSession.sharedSession.rx_JSON(API.locationsRequest).map {
       json in
       return json.flatMap { Location($0) }
    }
  }
}

I'd like for this to happen (pseudocode follows):

func getLocations(interval: NSTimeInterval) -> Observable<[Location]> {
  return Observable<[Location]>.create {
    sink in
    NSTimer(interval) {
      NSURLSession.sharedSession.rx_JSON(API.locationsRequest).map {
        json in
        sink.onNext(json.flatMap { Location($0) })
      }
    }
  }
}

Last thing I tried was adding an NSTimer to the mix, but I can't figure out how to take the reference to the sink and pass it around to the method called by the timer to actually send the events down the pipe, given that the handler for the timer must be on a standalone method. I tried throwing in the block timer extensions from BlocksKit but the timer was fired every second instead of being fired at the specified interval, which defeated the purpose.

I've also read about the Interval operator but I'm not sure it's the right way to go.

Any pointers on how to get this right?

The end goal would be to have the timer re-fire only after the previous call has finished (either success or fail).


Solution

  • You should do something like the code below:

      func getLocations(interval: NSTimeInterval) -> Observable<[CLLocation]> {
    
        return Observable<[CLLocation]>.create { observer in
    
          let interval = 20.0
    
          let getLocationDisposable = Observable<Int64>.interval(interval, scheduler: MainScheduler.instance)
            .subscribe { (e: Event<Int64>) in
    
              NSURLSession.sharedSession.rx_JSON(API.locationsRequest).map {
                json in
                observer.onNext(json.flatMap { Location($0) })
              }
          }
    
          return AnonymousDisposable {
            getLocationDisposable.dispose()
          }
    
        }
      }
    

    The code above fire every 20 seconds the API.locationsRequest and send the result on the same observable, Please note that you have to dispose the Interval when the maim observable dispose.