Search code examples

Multiple Rxjava retryWhen for handling different error

In my projet I have a method taht subscribe each on my observable the same way. I'm trying to enhanced it by puttin the retryWhen option on it.

To avoid a big retrywhen to handle different error I have design this logique

A RetryFunction class that is generic

abstract class RxStreamLimitedRetryFunction(private val nbOfAttempts: Int, val streamId: String) : Function<Observable<Throwable>, Observable<*>> {

override fun apply(t: Observable<Throwable>): Observable<*> {
    return t.flatMap {
        if (shouldRetry(it)) Observable.just(it)
        else Observable.empty()
    }.zipWith(Observable.range(0, nbOfAttempts + 1), BiFunction<Throwable, Int, Int> { throwable, attempts ->
        if (attempts == nbOfAttempts) {
            throw RetryMaxAttemptsException(nbOfAttempts)
        } else {
            Log.d("Retry nb ${attempts + 1} out of $nbOfAttempts for stream with id : $streamId with error ${throwable.message} ")
    }).flatMap { onRetry(it) }


abstract fun onRetry(attempsNb: Int): Observable<*>
abstract fun shouldRetry(throwable: Throwable): Boolean


two child class each with different retry attemps following the error

class RxStream404Retry(streamId: String) : RxStreamLimitedRetryFunction(4, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
    return Observable.timer(500, TimeUnit.MILLISECONDS)

override fun shouldRetry(throwable: Throwable): Boolean {
    return true
}  } 

class RxStream500Retry(streamId: String) : RxStreamLimitedRetryFunction(2, streamId) {
override fun onRetry(attempsNb: Int): Observable<*> {
    return Observable.timer(500, TimeUnit.MILLISECONDS)

override fun shouldRetry(throwable: Throwable): Boolean {
    return false
  • The shouldRetry method is simplified in this exemple

All of this retry function find they way in a list of retryfunction that is set using an ObservableTransformer to the observable via a retryWhen per function

class RetryComposer : ObservableTransformer<RxStreamSuccess, RxStreamSuccess> {

val retryFunctionList = arrayListOf(RxStream404Retry("Test1"),

override fun apply(upstream: Observable<RxStreamSuccess>): ObservableSource<RxStreamSuccess> {
    retryFunctionList.forEach {

    return upstream

My subscribing chain looks like this :

  streamCache[stremId] =   observable
        .doOnSubscribe { listener.onLoading() }
        .doOnComplete {
            Log.d(" Retry onComplete")
            streamCache.remove(stremId) }
            { result -> listener.onSuccess(result) },
            { throwable ->

When I test with an observable that goes into error nothing happen my RxStream404Retry is not trigger. Can you not put more thant one retryWhen per observable ?

Thank a lot


  • I think the issue comes from:

    retryFunctionList.forEach {
        upstream.retryWhen(it) <- this returns a new Observable that is not attached to any subscriber

    This code is equivalent to:

    Observable obs1 = upstream.retryWhen(RxStream404Retry("Test1"))
    Observable obs2 = upstream.retryWhen(RxStream500Retry("Test2"))
    return upstream

    So, these observables are not subscribed by the subscriber of the main Rx chain. You may have look at the amb() operators for that (

    You may try something like:

    return upstream.retryWhen(amb(retryFunctionList)) // pseudo code

    That would be the rough idea.