I am initiating an operation via a REST API in two steps:
The polling the task id will return a 202 which indicates that the operation is still in progress and a 200 when it completes. Any other code is an error.
I need to communicate to the subscribers the response of each step.
Previously, I would have the do operator push the response in between steps to a ReplaySubject.
.do(onNext: { response in
.flatMap({ response in
// If we could not get the task ID from the response we error
guard let taskID = getTaskIDFromJSON(response) else { return Observable.error(API.serverError) }
return Observable.just(taskID)
.flatMap({ taskID in
return pollTask(withID: taskID) // internally, it uses retryWhen to check the api again with a five second delay
.do(onNext: { response in
.subscribe(onError: { _ in
}, onCompleted: {
And somewhere else, a subscriber to the subject would do the following:
operationStatus.subscribe(onNext: { response
So essentially I am showing the progress of the operation and the response we get from each step at the same time.
At the time I was not familiar with Rx to come up with a cleaner solution. But now that I have familiarized my self with it, it seems to me that there should be a solution where we don't use side effects and contain this to a single final observable. Still, I cannot find a way to do it.
I was thinking about something like this:
let opObs = startReboot()
let pollingObs = pollTask(/* where does the id come from? */)
Observable.concat(opObs, pollingObs)
.subscribe(onNext : { response in
}, onError: { _ in
}, onCompleted: {
But that would imply that once opObs is done I would need to save the task id in variable outside of the monad and wrap pollingObs to fetch it when it starts - once again introducing side effects.
Is there an operator or combination of operators that I can use to emit the response of each step to a subscriber and also pass it to another observable?
Something like this should work. Note the use of share()
to avoid triggering twice the startReboot sequence.
let opObs = startReboot().share()
let pollingObs = opObs.flatMap {
guard let taskID = getTaskIDFromJSON(response) else { return Observable.error(API.serverError)
return pollTask(withID: taskID)
Observable.concat(opObs, pollingObs)
.subscribe(onNext : { response in
}, onError: { _ in
}, onCompleted: {