Search code examples
rx-javaobservableschedulerrx-java2emitter

How to correctly control emissions for defined Scheduler in created Observable


I am using RxJava2 and let's say I have this Observable:

Observable
   .create(emitter -> 
      SomeDependency.registerCallback(data -> emitter.onNext(data))
   )
   .subscribeOn(Schedulers.io());

It observes some asynchronous logic and then emits whatever it gets from it. Important to know is, that in the registered callback, the data is delivered on a thread handled by SomeDependency. As a consequence this causes all emitter's emissions to downstream to be delivered on that thread, ignoring defined Scheduler.

In https://stackoverflow.com/a/43283760/1618316 there is a hint from @akarnokd that indicates a way to redirect data to correct thread using Scheduler.Worker. Modified example for such approach would look as following:

Observable
   .create(emitter -> {
      final Worker worker = Schedulers.trampoline().createWorker();
      emitter.setDisposable(worker);
      SomeDependency.registerCallback(data -> 
         worker.schedule(() -> emitter.onNext(data))
      )
   })
   .subscribeOn(Schedulers.io());

NOTE: trampoline() creates a Scheduler for a current thread. In our case the io() thread as we have defined that one to create our Observable on.

The thing is, that when creating this kind of Observables you usually need not only to registerCallback() but also to unregisterCallback(). In common scenarios, you put unregisterCallback() into an emitter's Disposable. However, as you can see, our emitter already has a Disposable, not being able to set an another one. If the second Disposable would be set, then the previous one is unset and disposed.

Do you have any ideas on how to approach this problem, please?


Solution

  • I think what you need is CompositeDisposable.

    A disposable container that can hold onto multiple other disposables.