Search code examples
kotlinthread-safetyrx-javarx-java2reactivex

Use of AtomicBoolean required in RetryWhen


In the Javadoc example of Observable.retryWhen, AtomicInteger is used for counter instead of a simpler regular Int. Is this actually necessary? Under what circumstances can errors emit on a different thread?

My reading the docs and source indicate that the takeWhile and flatMap closures are always guaranteed to run on same thread.

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#retryWhen-io.reactivex.functions.Function-

Observable.timer(1, TimeUnit.SECONDS)
     .doOnSubscribe(s -> System.out.println("subscribing"))
     .map(v -> { throw new RuntimeException(); })
     .retryWhen(errors -> {
         AtomicInteger counter = new AtomicInteger();
         return errors
                   .takeWhile(e -> counter.getAndIncrement() != 3)
                   .flatMap(e -> {
                       System.out.println("delay retry by " + counter.get() + " second(s)");
                       return Observable.timer(counter.get(), TimeUnit.SECONDS);
                   });
     })
     .blockingSubscribe(System.out::println, System.out::println);

Solution

  • It is not strictly necessary but some get a heart attack when they see a one element int array used for the counter, hence the AtomicInteger.

     Observable.timer(1, TimeUnit.SECONDS)
     .doOnSubscribe(s -> System.out.println("subscribing"))
     .map(v -> { throw new RuntimeException(); })
     .retryWhen(errors -> {
         int[] counter = { 0 };
         return errors
                   .takeWhile(e -> counter[0]++ != 3)
                   .flatMap(e -> {
                       System.out.println("delay retry by " + counter[0] + " second(s)");
                       return Observable.timer(counter[0], TimeUnit.SECONDS);
                   });
     })
     .blockingSubscribe(System.out::println, System.out::println);
    

    Under what circumstances can errors emit on a different thread?

    The handler sequence can have its own threading so whenever you have shared external access to mutable state, you should make sure accessing that is thread safe. In the example, again, it is not necessary as the particular combination during the usage of the counter is running on a single thread and guaranteed not overlapping itself as any new error can only happen after the current sequence signaled the retry to happen.