Search code examples
iosrx-javarx-swift

Rx: How to modify a shared source observable within a retry


Top Level Question: I want to know how, within a retry, I can modify its source observable if it is an observable shared between multiple subscribers (in this case a BehaviorSubject/Relay).

Solution(s) I have considered: The suggestion of using defer from this post doesn't seem to naturally port over if the source observable needs to be shared.

Use case (to fully elaborate the question)
Say I have a server connection object that, when initialized, connects to an url. Once it is created, I can also use it to get a data stream for a particular input.

class ServerConnection {
    var url: URL
    init(url: URL)
    func getDataStream(input: String) -> Observable<Data> // the observable also errors when the instance is destroyed.
}

However, one particular url or another may be broken or overloaded. So I may want to obtain the address of a mirror and generate a new ServerConnection object. Let's say I have such a function.

// At any point in time, gets the mirror of the url with the lowest load
func getLowestLoadMirror(url: URL) -> URL {}

Ideally, I want this "mirror url" switching should be an implementation detail. The user of my code may only care about the data they receive. So we would want to encapsulate this logic in a new class:

class ServerConnectionWithMirrors {
    private var currentConnection: BehaviorRelay<ServerConnection>
    init(startingURL: URL)
    func dataStream(for inputParams: String) -> Observable<Data>
}

// usage
let connection = ServerConnectionWithMirrors(startingURL: "www.example.com")
connection.dataStream(for: "channel1")
    .subscribe { channel1Data in
        // do something with channel1Data
    }.disposed(by: disposeBag)

connection.dataStream(for: "channel2")
    .subscribe { channel2Data in
        // do something with channel2Data
    }.disposed(by: disposeBag)

How should I write the dataStream() function for ServerConnectionWithMirrors? I should be using retries, but I need to ensure that the retries, when faced with a particular error (ServerOverLoadedError) update the value on the behaviorRelay.

Here is code that I have so far that demonstrates the crux at what I am trying to do. One problem is that multiple subscribers to the behaviorRelay may all update it in rapid succession when they get an error, where only one update would do.

func dataStream(for inputParams: String) -> Observable<Data> {
    self.currentConnection.asObservable()
        .flatMapLatest { server in
            return server.getDataStream(input: inputParams)
        }
        .retryWhen { errors in
            errors.flatMapLatest { error in
                if error is ServerOverLoadedError {
                    self.currentConnection.accept(ServerConnection(url: getLowestLoadURL()))
                } else {
                    return Observable.error(error)
                }
            }
        }
}

Solution

  • The answer to your top level question:

    I want to know how, within a retry, I can modify its source observable if it is an observable shared between multiple subscribers (in this case a BehaviorSubject/Relay).

    You cannot modify a retry's source observable from within the retry. (full stop) You cannot do this whether it is shared or not. What you can do is make the source observable in such a way that it naturally updates its data for every subscription.

    That is what the question you referred to is trying to explain.

    func getData(from initialRequest: URLRequest) -> Observable<Data> {
        return Observable.deferred {
            var correctRequest = initialRequest
            let correctURL = getLowestLoadMirror(url: initialRequest.url!)
            correctRequest.url = correctURL
            return Observable.just(correctRequest)
        }
        .flatMapLatest {
            getDataFromServer(request: $0)
        }
        .retryWhen { error in
            error
                .do(onNext: {
                    guard $0 is ServerOverloadedError else { throw $0 }
                })
        }
    }
    

    With the above code, every time deferred is retried, it will call its closure and every time its closure is called, the URL will the lowest load will be used.