Search code examples
rx-java2

Why is my thread count going up with rxjava retry error?


In the function method, I throw a nullPointerException and print out thread count (sout(Thread.activeCount). It appears that each time I retry on error, that the threadCount increases by 1. Why is this happening? Should I assume it will just get garbage collected?

public void start(int time) {
    Observable.interval(time, TimeUnit.SECONDS)
            .doOnNext(t -> function())
            .doOnError(System.out::println)
            .retry()
            .subscribe();

}


Solution

  • The computation Scheduler used by that interval overload is limited to the number of CPUs (Runtime.getRuntime().availableProcessors()) by default. As you keep resubscribing due to retry, the Scheduler will spin up more and more worker threads. However, the total thread count should not grow endlessly.

    If you don't want to have so many worker threads, consider using Schedulers.single() or Schedulers.io() which latter reuses workers:

    Observable.interval(time, TimeUnit.SECONDS, Schedulers.single())
            .doOnNext(t -> function())
            .doOnError(System.out::println)
            .retry()
            .subscribe();
    

    or

    Observable.interval(time, TimeUnit.SECONDS, Schedulers.io())
            .doOnNext(t -> function())
            .doOnError(System.out::println)
            .retry()
            .subscribe();