Search code examples
androidsignalrrx-javasignalr-hubsignalr.client

Android SIgnalR Java Client - OnErrorNotImplementedException


recently started having this issue, I can't seem to reproduce it myself, but it's being reported on the crashlytics and is gaining momentum.

Fatal Exception: io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.RuntimeException: java.net.SocketException: Socket closed
   at io.reactivex.rxjava3.internal.observers.EmptyCompletableObserver.onError(EmptyCompletableObserver.java:50)
   at io.reactivex.rxjava3.internal.operators.completable.CompletableAndThenCompletable$SourceObserver.onError(CompletableAndThenCompletable.java:62)
   at io.reactivex.rxjava3.internal.operators.completable.CompletableAndThenCompletable$NextObserver.onError(CompletableAndThenCompletable.java:104)
   at io.reactivex.rxjava3.internal.operators.completable.CompletableDoOnEvent$DoOnEvent.onError(CompletableDoOnEvent.java:64)
   at io.reactivex.rxjava3.subjects.CompletableSubject.onError(CompletableSubject.java:126)
   at com.microsoft.signalr.OkHttpWebSocketWrapper$SignalRWebSocketListener.onFailure(OkHttpWebSocketWrapper.java:141)
   at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.kt:592)
   at okhttp3.internal.ws.RealWebSocket$connect$1.onResponse(RealWebSocket.kt:197)
   at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
   at java.lang.Thread.run(Thread.java:923)

I can't even trace the issue as this is being reported on Crashlytics. Started receiving this issue since I upgraded the SignalR lib from 3.0.0 to 6.0.8.

Here is the code that I'm using for connection:

    override fun init() {
    if (this::connection.isInitialized.not() || connection.connectionState != HubConnectionState.CONNECTED) {
        connection =
            HubConnectionBuilder.create("$baseUrl$hubEndpoint").withTransport(TransportEnum.WEBSOCKETS)
                .withAccessTokenProvider(getRefreshTokenSingle().onErrorReturnItem(""))
                .build()

            Timber.d("init")

        connection.onClosed {
            _connectionState.postValue(HubConnectionState.DISCONNECTED)

            if (it != null) {
                reconnect().subscribe({
                    _connectionState.postValue(HubConnectionState.CONNECTED)
                    Timber.d("reconnected")
                }, {
                    _connectionState.postValue(HubConnectionState.DISCONNECTED)
                    Timber.d("disconnected")
                })
            }
        }
    }
}

    private fun getRefreshTokenSingle(): Single<String> {
    return Single.create { emitter ->
        scope.launch {
            val accessToken = preferences.getAccessToken().orEmpty()
            val refreshToken = preferences.getRefreshToken().orEmpty()
            val tokenHeader = "${Constants.HTTP_HEADER_BEARER} $accessToken"

            authRepository.refreshToken(RefreshTokenRequest(refreshToken), tokenHeader).fold({
                if (isTokenRefreshError(it)) {
                    authRepository.logout()
                }

                emitter.tryOnError(it)
            }, {
                emitter.onSuccess(it.token.orEmpty())
            })
        }
    }
}

UPDATE: Here's how I'm using reconnect:

    private fun reconnect(): Completable {
    return stop()
        .andThen(
            Completable.create {
                _connectionState.postValue(HubConnectionState.CONNECTING)
                Timber.d("connecting")
                it.onComplete()
            })
        .andThen(
            Completable.create { emitter ->
                start().subscribe({
                    emitter.onComplete()
                }, {
                    emitter.onError(it)
                })
            }.retryWhen(DelayRetry())
        )
}

Start Stop:

override fun start(): Completable {
    if (connecting.get() || connection.connectionState == HubConnectionState.CONNECTED)
        return Completable.complete()

    connecting.set(true)

    _connectionState.postValue(HubConnectionState.CONNECTING)

    return connection.start()
        .doOnComplete {
            initializeHubServices()
            _connectionState.postValue(HubConnectionState.CONNECTED)
        }
        .doFinally {
            connecting.set(false)
        }
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
}

override fun stop(): Completable {
    return if (!this::connection.isInitialized)
        Completable.complete()
    else
        this.connection.stop()
            .andThen {
                _connectionState.postValue(HubConnectionState.DISCONNECTED)
                disposeHubServices()
                cancelRefreshTokenJob()
                it.onComplete()
                Timber.d("disconnected")
            }
}

Solution

  • I have managed to solve this issue:

    My hub stop subscribe method was missing the onError in a fragment. I added it like:

    hub.stop().subscribe({},{ Timber.d(it) })
    

    and the issue is gone now....