The question is about RxJava2.
Noticed that zipping Throwable
that comes from retryWhen
with range
emits all items from Observable.range
before zipper function has been applied. Also, range
emits sequence even if zipWith
wasn't called. For example this source code
Observable.create<String> {
println("subscribing")
it.onError(RuntimeException("always fails"))
}
.retryWhen {
it.zipWith(Observable.range(1, 3).doOnNext { println("range $it") },
BiFunction { t: Throwable, i: Int -> i })
.flatMap {
System.out.println("delay retry by $it + second(s)")
Observable.timer(it.toLong(), TimeUnit.SECONDS)
}
}./*subscribe*/
gives the following result
range 1
range 2
range 3
subscribing
delay retry by 1 + second(s)
subscribing
delay retry by 2 + second(s)
subscribing
delay retry by 3 + second(s)
subscribing
onComplete
Replacing onError
in observable
creation also don't eliminate emitting range
items. So the question is why it's happening as Range
is cold.
Observable
s in 2.x don't have backpressure thus a range
operator will emit all its items as soon as it can. Your case, however, can use a normal counter incremented along the error notification of the retry handler:
source.retryWhen(e -> {
int[] counter = { 0 };
return e.takeWhile(v -> ++counter[0] < 4)
.flatMap(v -> Observable.timer(counter[0], TimeUnit.SECONDS));
})