recently started having this issue, I can't seem to reproduce it myself, but it's being reported on the crashlytics and is gaining momentum.
Fatal Exception: io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.RuntimeException: java.net.SocketException: Socket closed
at io.reactivex.rxjava3.internal.observers.EmptyCompletableObserver.onError(EmptyCompletableObserver.java:50)
at io.reactivex.rxjava3.internal.operators.completable.CompletableAndThenCompletable$SourceObserver.onError(CompletableAndThenCompletable.java:62)
at io.reactivex.rxjava3.internal.operators.completable.CompletableAndThenCompletable$NextObserver.onError(CompletableAndThenCompletable.java:104)
at io.reactivex.rxjava3.internal.operators.completable.CompletableDoOnEvent$DoOnEvent.onError(CompletableDoOnEvent.java:64)
at io.reactivex.rxjava3.subjects.CompletableSubject.onError(CompletableSubject.java:126)
at com.microsoft.signalr.OkHttpWebSocketWrapper$SignalRWebSocketListener.onFailure(OkHttpWebSocketWrapper.java:141)
at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.kt:592)
at okhttp3.internal.ws.RealWebSocket$connect$1.onResponse(RealWebSocket.kt:197)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
at java.lang.Thread.run(Thread.java:923)
I can't even trace the issue as this is being reported on Crashlytics. Started receiving this issue since I upgraded the SignalR lib from 3.0.0 to 6.0.8.
Here is the code that I'm using for connection:
override fun init() {
if (this::connection.isInitialized.not() || connection.connectionState != HubConnectionState.CONNECTED) {
connection =
HubConnectionBuilder.create("$baseUrl$hubEndpoint").withTransport(TransportEnum.WEBSOCKETS)
.withAccessTokenProvider(getRefreshTokenSingle().onErrorReturnItem(""))
.build()
Timber.d("init")
connection.onClosed {
_connectionState.postValue(HubConnectionState.DISCONNECTED)
if (it != null) {
reconnect().subscribe({
_connectionState.postValue(HubConnectionState.CONNECTED)
Timber.d("reconnected")
}, {
_connectionState.postValue(HubConnectionState.DISCONNECTED)
Timber.d("disconnected")
})
}
}
}
}
private fun getRefreshTokenSingle(): Single<String> {
return Single.create { emitter ->
scope.launch {
val accessToken = preferences.getAccessToken().orEmpty()
val refreshToken = preferences.getRefreshToken().orEmpty()
val tokenHeader = "${Constants.HTTP_HEADER_BEARER} $accessToken"
authRepository.refreshToken(RefreshTokenRequest(refreshToken), tokenHeader).fold({
if (isTokenRefreshError(it)) {
authRepository.logout()
}
emitter.tryOnError(it)
}, {
emitter.onSuccess(it.token.orEmpty())
})
}
}
}
UPDATE: Here's how I'm using reconnect:
private fun reconnect(): Completable {
return stop()
.andThen(
Completable.create {
_connectionState.postValue(HubConnectionState.CONNECTING)
Timber.d("connecting")
it.onComplete()
})
.andThen(
Completable.create { emitter ->
start().subscribe({
emitter.onComplete()
}, {
emitter.onError(it)
})
}.retryWhen(DelayRetry())
)
}
Start Stop:
override fun start(): Completable {
if (connecting.get() || connection.connectionState == HubConnectionState.CONNECTED)
return Completable.complete()
connecting.set(true)
_connectionState.postValue(HubConnectionState.CONNECTING)
return connection.start()
.doOnComplete {
initializeHubServices()
_connectionState.postValue(HubConnectionState.CONNECTED)
}
.doFinally {
connecting.set(false)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
override fun stop(): Completable {
return if (!this::connection.isInitialized)
Completable.complete()
else
this.connection.stop()
.andThen {
_connectionState.postValue(HubConnectionState.DISCONNECTED)
disposeHubServices()
cancelRefreshTokenJob()
it.onComplete()
Timber.d("disconnected")
}
}
I have managed to solve this issue:
My hub stop subscribe method was missing the onError in a fragment. I added it like:
hub.stop().subscribe({},{ Timber.d(it) })
and the issue is gone now....