Search code examples
javarx-javarx-java3

How to handle multiple subscriptions in a fair manner?


I need to handle different types of events in a strict one by one manner, but in a background thread.

According to the documentation, the next code, Schedulers.from(executor, false, true);, should cover my requirements, but in reality it doesn't.

The code:

ExecutorService executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor, false, true);
PublishSubject<String> subject1 = PublishSubject.create();
PublishSubject<String> subject2 = PublishSubject.create();

subject1.observeOn(scheduler).subscribe(log::info);
subject2.observeOn(scheduler).subscribe(log::info);

subject1.onNext("Hello11");
subject2.onNext("Hello21");
subject1.onNext("Hello12");
subject2.onNext("Hello22");
subject1.onNext("Hello13");
subject2.onNext("Hello23");

log.info("Test activity");

Has the following output:

22:19:05.313 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Test activity
22:19:05.313 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello11
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello12
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello13
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello21
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello22
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello23

Which shows, that events handling is executed in a greedy manner, when every Observer gives all events before freeing the scheduler, which contradicts with the documentation.

If .observeOn(scheduler) is replaced with the .subscribeOn(scheduler) the output is the next:

22:23:56.162 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello11
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello21
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello12
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello22
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello13
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello23
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Test activity

Which executes all events in the same thread, which contradicts with the whole idea of .subscribeOn.

Is this a bug or there is a way to make it work as expected in the documentation? The version is io.reactivex.rxjava3:rxjava:3.1.9.


Solution

  • It is not a bug with the Scheduler but the consequence of the observeOn which always operates in a greedy manner. In the first case, because all items to the first sequence was available practically immediately to the observeOn, it emits those on the same thread in one executor run.

    You can use another operator which creates one task per item such as delay with zero delay to get a better interleaving:

    subject1.delay(0, TimeUnit.MILLISECONDS, scheduler).subscribe(log::info);
    subject2.delay(0, TimeUnit.MILLISECONDS, scheduler).subscribe(log::info);
    

    The second case is working as intended because using subscribeOn on a Subject has no effect on the items it delivers. In your case, the items were emitted and thus processed on the same thread as it would happen without subscribeOn.