In the Javadoc example of Observable.retryWhen
, AtomicInteger
is used for counter
instead of a simpler regular Int
. Is this actually necessary? Under what circumstances can errors
emit on a different thread?
My reading the docs and source indicate that the takeWhile
and flatMap
closures are always guaranteed to run on same thread.
Observable.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.map(v -> { throw new RuntimeException(); })
.retryWhen(errors -> {
AtomicInteger counter = new AtomicInteger();
return errors
.takeWhile(e -> counter.getAndIncrement() != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter.get() + " second(s)");
return Observable.timer(counter.get(), TimeUnit.SECONDS);
});
})
.blockingSubscribe(System.out::println, System.out::println);
It is not strictly necessary but some get a heart attack when they see a one element int array used for the counter, hence the AtomicInteger
.
Observable.timer(1, TimeUnit.SECONDS)
.doOnSubscribe(s -> System.out.println("subscribing"))
.map(v -> { throw new RuntimeException(); })
.retryWhen(errors -> {
int[] counter = { 0 };
return errors
.takeWhile(e -> counter[0]++ != 3)
.flatMap(e -> {
System.out.println("delay retry by " + counter[0] + " second(s)");
return Observable.timer(counter[0], TimeUnit.SECONDS);
});
})
.blockingSubscribe(System.out::println, System.out::println);
Under what circumstances can errors emit on a different thread?
The handler sequence can have its own threading so whenever you have shared external access to mutable state, you should make sure accessing that is thread safe. In the example, again, it is not necessary as the particular combination during the usage of the counter is running on a single thread and guaranteed not overlapping itself as any new error can only happen after the current sequence signaled the retry to happen.