In my projet I have a method taht subscribe each on my observable the same way. I'm trying to enhanced it by puttin the retryWhen option on it.
To avoid a big retrywhen to handle different error I have design this logique
A RetryFunction class that is generic
abstract class RxStreamLimitedRetryFunction(private val nbOfAttempts: Int, val streamId: String) : Function<Observable<Throwable>, Observable<*>> {
override fun apply(t: Observable<Throwable>): Observable<*> {
return t.flatMap {
if (shouldRetry(it)) Observable.just(it)
else Observable.empty()
}.zipWith(Observable.range(0, nbOfAttempts + 1), BiFunction<Throwable, Int, Int> { throwable, attempts ->
if (attempts == nbOfAttempts) {
throw RetryMaxAttemptsException(nbOfAttempts)
} else {
Log.d("Retry nb ${attempts + 1} out of $nbOfAttempts for stream with id : $streamId with error ${throwable.message} ")
attempts
}
}).flatMap { onRetry(it) }
}
abstract fun onRetry(attempsNb: Int): Observable<*>
abstract fun shouldRetry(throwable: Throwable): Boolean
}
two child class each with different retry attemps following the error
class RxStream404Retry(streamId: String) : RxStreamLimitedRetryFunction(4, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
return Observable.timer(500, TimeUnit.MILLISECONDS)
}
override fun shouldRetry(throwable: Throwable): Boolean {
return true
} }
class RxStream500Retry(streamId: String) : RxStreamLimitedRetryFunction(2, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
return Observable.timer(500, TimeUnit.MILLISECONDS)
}
override fun shouldRetry(throwable: Throwable): Boolean {
return false
}}
All of this retry function find they way in a list of retryfunction that is set using an ObservableTransformer to the observable via a retryWhen per function
class RetryComposer : ObservableTransformer<RxStreamSuccess, RxStreamSuccess> {
val retryFunctionList = arrayListOf(RxStream404Retry("Test1"),
RxStream500Retry("Test2")
)
override fun apply(upstream: Observable<RxStreamSuccess>): ObservableSource<RxStreamSuccess> {
retryFunctionList.forEach {
upstream.retryWhen(it)
}
return upstream
}}
My subscribing chain looks like this :
streamCache[stremId] = observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { listener.onLoading() }
.compose(RetryComposer())
.doOnComplete {
Log.d(" Retry onComplete")
streamCache.remove(stremId) }
.subscribe(
{ result -> listener.onSuccess(result) },
{ throwable ->
streamCache.remove(stremId)
}
)
When I test with an observable that goes into error nothing happen my RxStream404Retry is not trigger. Can you not put more thant one retryWhen per observable ?
Thank a lot
I think the issue comes from:
retryFunctionList.forEach {
upstream.retryWhen(it) <- this returns a new Observable that is not attached to any subscriber
}
This code is equivalent to:
Observable obs1 = upstream.retryWhen(RxStream404Retry("Test1"))
Observable obs2 = upstream.retryWhen(RxStream500Retry("Test2"))
return upstream
So, these observables are not subscribed by the subscriber of the main Rx chain.
You may have look at the amb()
operators for that (http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#amb-java.lang.Iterable-)
You may try something like:
return upstream.retryWhen(amb(retryFunctionList)) // pseudo code
That would be the rough idea.