Search code examples
iosrx-swiftreactivex

Emitting progress items while forwarding result in a single observable


I am initiating an operation via a REST API in two steps:

  1. Start operation and return a task id
  2. Poll task with the given id and complete the sequence when the operation returns complete.

The polling the task id will return a 202 which indicates that the operation is still in progress and a 200 when it completes. Any other code is an error.

I need to communicate to the subscribers the response of each step.

Previously, I would have the do operator push the response in between steps to a ReplaySubject.

startReboot()
  .do(onNext: { response in
     operationStatus.next(response)
  )
  .flatMap({ response in
     // If we could not get the task ID from the response we error
     guard let taskID = getTaskIDFromJSON(response) else { return Observable.error(API.serverError) }

     return Observable.just(taskID)
  })
  .flatMap({ taskID in
      return pollTask(withID: taskID) // internally, it uses retryWhen to check the api again with a five second delay
  })
  .do(onNext: { response in
     operationStatus.next(response)
  })
  .subscribe(onError: { _ in
     showOperationFailedIcon()
  }, onCompleted: {
     showOperationCompleteIcon()
  })

And somewhere else, a subscriber to the subject would do the following:

operationStatus.subscribe(onNext: { response
    showResponse(response)
})

So essentially I am showing the progress of the operation and the response we get from each step at the same time.

At the time I was not familiar with Rx to come up with a cleaner solution. But now that I have familiarized my self with it, it seems to me that there should be a solution where we don't use side effects and contain this to a single final observable. Still, I cannot find a way to do it.

I was thinking about something like this:

let opObs = startReboot()
let pollingObs = pollTask(/* where does the id come from? */)
Observable.concat(opObs, pollingObs)
   .subscribe(onNext : { response in
     showResponse(response)
   }, onError: { _ in
     showOperationFailedIcon()
   }, onCompleted: {
     showOperationCompleteIcon()
   })

But that would imply that once opObs is done I would need to save the task id in variable outside of the monad and wrap pollingObs to fetch it when it starts - once again introducing side effects.

Is there an operator or combination of operators that I can use to emit the response of each step to a subscriber and also pass it to another observable?


Solution

  • Something like this should work. Note the use of share() to avoid triggering twice the startReboot sequence.

    let opObs = startReboot().share()
    let pollingObs = opObs.flatMap {
        guard let taskID = getTaskIDFromJSON(response) else { return Observable.error(API.serverError)
    
        return pollTask(withID: taskID)
    }
    Observable.concat(opObs, pollingObs)
       .subscribe(onNext : { response in
         showResponse(response)
       }, onError: { _ in
         showOperationFailedIcon()
       }, onCompleted: {
         showOperationCompleteIcon()
       })