Search code examples
swiftrx-swiftreactivex

Buffer Observable Until Another Observable Completes


I'm using RxSwift to wrap a mobile app's server sync process. I have an Observable<RemoteEvent> that wraps a websocket connection and emits each message received as an Event. Similarly, I have an Observable<SynchronizationResult> that wraps an API sync process. Once my application opens the WebSocket connection, the server sends a hello message. After that message is received, I want to start the sync process and buffer any events until the sync is completed. This is where I'm struggling. Currently, I have:

self.eventStreamService.observe(connection).scan((nil, [])) { (state, event) -> (Observable<RemoteEvent>?, [RemoteEvent]) in
  guard event.type == "hello" else {
    return (state.0?.concat(Observable.just(event)), state.1 + [event])
  }

  // This is the sync operation
  return (
    self.synchronizationService
      .synchronize(ConnectionSynchronizationContext(connection: connection), lightweight: true)
      .toArray()
      .flatMap { results -> Observable<RemoteEvent> in
        (state.1 + [event]).toObservable()
      },
    []
  )
}
.flatMapLatest { $0.0 ?? Observable.empty() }

Despite this being fairly ugly, it also has a significant bug: any incoming event results in the synchronization Observable being re-subscribed to, which then restarts the whole sync process. I'm sure there must be a better way to do this.


Solution

  • Here's how you could get the functionality you're looking for:

    // this is a stub for the purpose of the example
    let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    let websocketEvents = interval
        .map { i -> String in
            if i == 1 {
                return "hello"
            } else {
                return String(i)
            }
        }
        .replayAll()
    
    websocketEvents.connect()
    
    func performSync() -> Observable<Void> {
        return Observable<Void>.create { o in
            print("starting sync")
            // actually start sync with server
            // ....
            delay(2.0) {
                print("sync finished")
                o.onNext(())
            }
            return NopDisposable.instance
        }
    }
    
    // websocket events as they come, regardless of sync status
    websocketEvents
        .subscribeNext { e in
            print("websocket event received: \(e)")
        }
    
    // all websocket events, buffered and only emitted post-sync
    websocketEvents
        .filter { $0 == "hello" }
        .flatMapLatest { _ in performSync() }
        .flatMapLatest { _ in websocketEvents }
        .subscribeNext { e in
            print("websocket event post sync: \(e)")
        }
    

    This will output:

    websocket event received: 0
    websocket event received: hello
    starting sync
    websocket event received: 2
    websocket event received: 3
    sync finished
    websocket event post sync: 0
    websocket event post sync: hello
    websocket event post sync: 2
    websocket event post sync: 3
    websocket event received: 4
    websocket event post sync: 4
    websocket event received: 5
    websocket event post sync: 5