Search code examples
androidmobilesystem.reactivepolling

RX Android Observable.interval() causes OOM


I should probably start with I'm a newbie in the android RX world. I have a polling method where I use Observable.interval() to poll once a second:

Observable.interval(0, 1, TimeUnit.SECONDS)
                    .timeInterval()
                    .subscribeOn(Schedulers.io())
                    .subscribe(new Consumer<Timed<Long>>() {
                        @Override
                        public void accept(@NonNull Timed<Long> longTimed) throws Exception 
                            {
                              presenter.poll()
                            }

                        }
                    });

Now this works fine for a while but after I get an out of memory exception. My guess is that I'm creating too many threads very fast and the GC can't purge them in time.

1638 W System.err:  at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
04-09 09:19:55.634  1543  1638 W System.err:    at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:42)
04-09 09:19:55.634  1543  1638 W System.err:    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:462)
04-09 09:19:55.634  1543  1638 W System.err:    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:307)
04-09 09:19:55.634  1543  1638 W System.err:    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:302)
04-09 09:19:55.634  1543  1638 W System.err:    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
04-09 09:19:55.634  1543  1638 W System.err:    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
04-09 09:19:55.634  1543  1638 W System.err:    at java.lang.Thread.run(Thread.java:919)
04-09 09:19:55.634  1543  1638 W System.err: Caused by: java.lang.OutOfMemoryError: pthread_create (1040KB stack) failed: Try again
04-09 09:19:55.634  1543  1638 W System.err:    at java.lang.Thread.nativeCreate(Native Method)
04-09 09:19:55.634  1543  1638 W System.err:    at java.lang.Thread.start(Thread.java:883)
04-09 09:19:55.634  1543  1638 W System.err:    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:975)
04-09 09:19:55.634  1543  1638 W System.err:    at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1617)
04-09 09:19:55.634  1543  1638 W System.err:    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:342)
04-09 09:19:55.634  1543  1638 W System.err:    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:579)
04-09 09:19:55.634  1543  1638 W System.err:    at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:680)
04-09 09:19:55.634  1543  1638 W System.err:    at io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:146)
04-09 09:19:55.634  1543  1638 W System.err:    at io.reactivex.internal.schedulers.IoScheduler$EventLoopWorker.schedule(IoScheduler.java:230)
04-09 09:19:55.634  1543  1638 W System.err:    at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:136)
04-09 09:19:55.634  1543  1638 W System.err:    at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:112)
04-09 09:19:55.635  1543  1638 W System.err:    at io.reactivex.internal.operators.single.SingleSubscribeOn.subscribeActual(SingleSubscribeOn.java:37)
04-09 09:19:55.635  1543  1638 W System.err:    at io.reactivex.Single.subscribe(Single.java:3096)
04-09 09:19:55.635  1543  1638 W System.err:    at mypackagename.Presenter.poll(Presenter.java:83)

Is there any way I could reuse a scheduler? Or manually delete/ close it after I polled?


Solution

  • Declare a variable of Disposable type: So that if we need to stop this then this will be used.

    Disposable disposable;
    

    And after that just follow the code below:

    disposable = Observable.interval(1, TimeUnit.SECONDS)
                        .doOnNext(t -> YOUR_FUNCTION_NAME())
                        .subscribe();
    

    and when you need to stop just use the code below:

      if (!disposable.isDisposed()) {
                    disposable.dispose();
    

    NOTE: Replace YOUR_FUNCTION_NAME with your function name.