Search code examples
androidkotlinrx-java2

Multiple Rxjava retryWhen for handling different error


In my projet I have a method taht subscribe each on my observable the same way. I'm trying to enhanced it by puttin the retryWhen option on it.

To avoid a big retrywhen to handle different error I have design this logique

A RetryFunction class that is generic

abstract class RxStreamLimitedRetryFunction(private val nbOfAttempts: Int, val streamId: String) : Function<Observable<Throwable>, Observable<*>> {

override fun apply(t: Observable<Throwable>): Observable<*> {
    return t.flatMap {
        if (shouldRetry(it)) Observable.just(it)
        else Observable.empty()
    }.zipWith(Observable.range(0, nbOfAttempts + 1), BiFunction<Throwable, Int, Int> { throwable, attempts ->
        if (attempts == nbOfAttempts) {
            throw RetryMaxAttemptsException(nbOfAttempts)
        } else {
            Log.d("Retry nb ${attempts + 1} out of $nbOfAttempts for stream with id : $streamId with error ${throwable.message} ")
            attempts
        }
    }).flatMap { onRetry(it) }

}

abstract fun onRetry(attempsNb: Int): Observable<*>
abstract fun shouldRetry(throwable: Throwable): Boolean

}

two child class each with different retry attemps following the error

class RxStream404Retry(streamId: String) : RxStreamLimitedRetryFunction(4, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
    return Observable.timer(500, TimeUnit.MILLISECONDS)
}

override fun shouldRetry(throwable: Throwable): Boolean {
    return true
}  } 

class RxStream500Retry(streamId: String) : RxStreamLimitedRetryFunction(2, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
    return Observable.timer(500, TimeUnit.MILLISECONDS)
}

override fun shouldRetry(throwable: Throwable): Boolean {
    return false
}}
  • The shouldRetry method is simplified in this exemple

All of this retry function find they way in a list of retryfunction that is set using an ObservableTransformer to the observable via a retryWhen per function

class RetryComposer : ObservableTransformer<RxStreamSuccess, RxStreamSuccess> {

val retryFunctionList = arrayListOf(RxStream404Retry("Test1"),
    RxStream500Retry("Test2")
)


override fun apply(upstream: Observable<RxStreamSuccess>): ObservableSource<RxStreamSuccess> {
    retryFunctionList.forEach {
        upstream.retryWhen(it)
    }

    return upstream
}}

My subscribing chain looks like this :

  streamCache[stremId] =   observable
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .doOnSubscribe { listener.onLoading() }
        .compose(RetryComposer())            
        .doOnComplete {
            Log.d(" Retry onComplete")
            streamCache.remove(stremId) }
        .subscribe(
            { result -> listener.onSuccess(result) },
            { throwable ->
                streamCache.remove(stremId)
            }
        )

When I test with an observable that goes into error nothing happen my RxStream404Retry is not trigger. Can you not put more thant one retryWhen per observable ?

Thank a lot


Solution

  • I think the issue comes from:

    retryFunctionList.forEach {
        upstream.retryWhen(it) <- this returns a new Observable that is not attached to any subscriber
    }
    

    This code is equivalent to:

    Observable obs1 = upstream.retryWhen(RxStream404Retry("Test1"))
    Observable obs2 = upstream.retryWhen(RxStream500Retry("Test2"))
    return upstream
    

    So, these observables are not subscribed by the subscriber of the main Rx chain. You may have look at the amb() operators for that (http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html#amb-java.lang.Iterable-)

    You may try something like:

    return upstream.retryWhen(amb(retryFunctionList)) // pseudo code
    

    That would be the rough idea.