Search code examples
reactive-programmingrx-java2rx-kotlin

Using range in zipWith also emits all items from range sequence before zipper function applied


The question is about RxJava2.

Noticed that zipping Throwable that comes from retryWhen with range emits all items from Observable.range before zipper function has been applied. Also, range emits sequence even if zipWith wasn't called. For example this source code

Observable.create<String> {
        println("subscribing")
        it.onError(RuntimeException("always fails"))
    }
    .retryWhen {
        it.zipWith(Observable.range(1, 3).doOnNext { println("range $it") },
                BiFunction { t: Throwable, i: Int -> i })
                .flatMap {
                    System.out.println("delay retry by $it + second(s)")
                    Observable.timer(it.toLong(), TimeUnit.SECONDS)
                }
    }./*subscribe*/

gives the following result

 range 1
 range 2
 range 3
 subscribing
 delay retry by 1 + second(s)
 subscribing
 delay retry by 2 + second(s)
 subscribing
 delay retry by 3 + second(s)
 subscribing
 onComplete

Replacing onError in observable creation also don't eliminate emitting range items. So the question is why it's happening as Range is cold.


Solution

  • Observables in 2.x don't have backpressure thus a range operator will emit all its items as soon as it can. Your case, however, can use a normal counter incremented along the error notification of the retry handler:

    source.retryWhen(e -> {
        int[] counter = { 0 };
        return e.takeWhile(v -> ++counter[0] < 4)
                .flatMap(v -> Observable.timer(counter[0], TimeUnit.SECONDS));
    })