Search code examples
swiftrx-swiftrefresh-tokengrpc-swift

RxSwift refreshToken use retry(when:)


I wrote such code for network request token refresh.

enum NetworkingClient {
    private static func _makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
        call maker: @escaping () -> UnaryCall<Request, Response>
        ) -> Single<Response> {
        return Single<Response>.create { (handler) -> Disposable in
            let call = maker()
            call.response
                .whenComplete {
                    do {
                        handler(.success(try $0.get()))
                    } catch {
                        handler(.error(error))
                    }
                }
            return Disposables.create()
        }
        .subscribeOn(requestScheduler)
    }

    static func makeUnaryCall<Request: SwiftProtobuf.Message, Response: SwiftProtobuf.Message>(
        serverInfo: ServerInfo,
        call maker: @escaping () -> UnaryCall<Request, Response>
        ) -> Single<Response> {
        let gid = serverInfo.gid
        let uid = serverInfo.uid
        return self._makeUnaryCall(call: maker)
            .retryWhen { errorObservable -> Observable<Void> in
                return errorObservable.flatMap { error -> Observable<Void> in
                    if let errorCode = error as? NetworkingError, case .tokenExpired(let token) = errorCode {
                        return VoidToken.reLogin(token: token!, serverInfo: serverInfo)
                    }
                    throw error
                }            }
            .do(onError: { (err) in
                log.error("\(gid)-\(uid)-\(serverInfo.host)-\(serverInfo.port)-\(err)")
            })
            .observeOn(MainScheduler.instance)
    }
}

enum VoidToken {
    static var caches: [String: Observable<Void>] = [:]
    static let lock = NSLock()
    static func reLogin(token: Token, serverInfo: ServerInfo) -> Observable<Void> {
        VoidToken.lock.lock()
        if let refreshTokenReq = VoidToken.caches[token.accessToken] {
            VoidToken.lock.unlock()
            return refreshTokenReq
        }
        var service: LoginService? = LoginService(serverInfo: serverInfo)
        let refreshReq = service!
            .refreshToken(refreshToken: token.refreshToken)
            .retryWhen({ errorObservable in
                return errorObservable.flatMap { error -> Observable<Void> in
                    if let err = error as? GRPCStatus, err.code == .unavailable {
                        return .just(())
                    }
                    throw error
                }
            })
            .debug("refreshToken")
            .asObservable()
            .share(replay: 1, scope: .forever)
            .map { _ in }
            .do(afterCompleted: {
                service = nil
            })
        VoidToken.caches[token.accessToken] = refreshReq
        VoidToken.lock.unlock()
        return refreshReq
    }
}

enum NetworkingError: Error {
    case tokenExpired(Token?)
}

extension LoginService {
    func refreshToken(refreshToken: String, loginChatService: Bool = true) -> Single<Void> {
        let gid = self.gid
        let uid = self.uid
        return defaultImplementation
            .reLogin(refreshToken: refreshToken)
            .map { (rsp) -> Token in
                precondition(rsp.hasSess)
                AccountKit.shared.updateToken(pid: gid + uid, session: rsp.sess)
                let token = AccountKit.shared.fetchAuth(gid: gid, uid: uid)!
                log.warn("token refresh success, newToken: \(token.accessToken) newIMToken\(token.imToken)")
                return token
            }
            .flatMap { token -> Single<Void> in
                guard loginChatService else { return Single.just(()) }
                guard let chatService = AccountKit.shared.chatInfos.fetchWrapper(pid: gid + uid) else {
                    throw SourceContextError()
                }
                return chatService.rx.login(imToken: token.imToken, updateSyncTime: false)
                    .map { _ in () }
                    .catchErrorJustReturn(())
            }
    }
}

But, I get a log that looks like this

2021-07-19 16:24:24.208: refreshToken -> subscribed
2021-07-19 16:24:24.494: refreshToken -> isDisposed
2021-07-19 16:24:25.037: refreshToken -> subscribed
2021-07-19 16:24:25.251: refreshToken -> Event error(notLogin)
2021-07-19 16:24:25.251: refreshToken -> isDisposed

I don’t know why Refresh Token isDisposed when there is no ON NEXT
This is an occasional problem, but I don’t know what caused it
Its log output should normally look like this:

2021-07-22 10:04:45.794: refreshToken -> subscribed
2021-07-22 10:04:52.254: refreshToken -> Event next(())
2021-07-22 10:04:52.266: refreshToken -> Event completed
2021-07-22 10:04:52.266: refreshToken -> isDisposed

I'd really appreciate it if you could help me.


Solution

  • Looking at the Observable chain above the debug statement:

    let refreshReq = service!
        .refreshToken(refreshToken: token.refreshToken)
        .retryWhen({ errorObservable in
            return errorObservable.flatMap { error -> Observable<Void> in
                if let err = error as? GRPCStatus, err.code == .unavailable {
                    return .just(())
                }
                throw error
            }
        })
    

    The above will emit an error when either refreshToken(refreshToken:) or retryWhen emits an error of any type other than a GRPCStatus error with a code equal to .unavailable .

    You say the debug is showing an error called notLogin which apparently is not a GRPCStatus error with a code equal to unavailable.

    Also, looking at the log:

    2021-07-19 16:24:24.208: refreshToken -> subscribed
    2021-07-19 16:24:24.494: refreshToken -> isDisposed
    2021-07-19 16:24:25.037: refreshToken -> subscribed
    2021-07-19 16:24:25.251: refreshToken -> Event error(notLogin)
    2021-07-19 16:24:25.251: refreshToken -> isDisposed
    

    This tells you that something disposed the refreshToken disposable 286 milliseconds after it subscribed, then resubscribed (or something else subscribed) 543 milliseconds later which emitted the error after another 214 milliseconds...

    To me that says the problem isn't the code you posted, but rather the code that prematurely disposed your reLogin observable.

    BTW, I wrote an article on this subject that you might find helpful: RxSwift and Handling Invalid Tokens