Search code examples
javarx-java3

Thread execution of value emitting code and value receiving code in RxJava


I have following code:

private static void log(Object msg) {
        System.out.println(
                Thread.currentThread().getName() +
                        ": " + msg);
}

Observable<Integer> naturalNumbers = Observable.create(emitter -> {
            log("Invoked"); // on main thread
            Runnable r = () -> {
                log("Invoked on another thread");
                int i = 0;
                while(!emitter.isDisposed()) {
                    log("Emitting "+ i);
                    emitter.onNext(i);
                    i += 1;
                }
            };
            new Thread(r).start();
        });
Disposable disposable = naturalNumbers.subscribe(i -> log("Received "+i));

So here we have 2 important lambda expressions. First is the one we pass to Observable.create, second is the callback one we pass to Observable.subscribe(). In first lambda, we create a new thread and then emit values on that thread. In second lambda, we have the code to receive those values emitted in first lambda code. I observe that both code are executed on same thread.

Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
Thread-0: Received 1
Thread-0: Emitting 2
Thread-0: Received 2

Why is it so? Does RxJava by default run code emitting values(observable) and the code receiving values(observer) on same thread?


Solution

  • Let's see, what happens, if you use a Thread to execute a runnable:

    Test

    @Test
      void threadTest() throws Exception {
        log("main");
        CountDownLatch countDownLatch = new CountDownLatch(1);
    
        new Thread(
                () -> {
                  log("thread");
                  countDownLatch.countDown();
                })
            .start();
    
        countDownLatch.await();
      }
    

    Output

    main: main
    Thread-0: thread
    

    It seems, that the main entry point is called from main thread and the newly created Thread is called Thread-0.

    Why is it so? Does RxJava by default run code emitting values(observable) and the code receiving values(observer) on same thread?

    By default RxJava is single-threaded. Therefore the the producer, if not definied differently by observeOn, subscribeOn or different threading layout, will emit values on the consumer (subsriber)-thread. This is because RxJava runs everything on the subscribing stack by default.

    Example 2

    @Test
      void fdskfkjsj() throws Exception {
          log("main");
    
          Observable<Integer> naturalNumbers =
            Observable.create(
                emitter -> {
                  log("Invoked"); // on main thread
                  Runnable r =
                      () -> {
                        log("Invoked on another thread");
                        int i = 0;
                        while (!emitter.isDisposed()) {
                          log("Emitting " + i);
                          emitter.onNext(i);
                          i += 1;
                        }
                      };
                  new Thread(r).start();
                });
        Disposable disposable = naturalNumbers.subscribe(i -> log("Received " + i));
    
        Thread.sleep(100);
      }
    

    Output2

    main: main
    main: Invoked
    Thread-0: Invoked on another thread
    Thread-0: Emitting 0
    Thread-0: Received 0
    Thread-0: Emitting 1
    

    In your example it is apparent, that the main method is called from the main thread. Furthermore the subscribeActual call is also run on the calling-thread (main). But the Observable#create lambda calls onNext from the newly created thread Thread-0. The value is pushed to the subscriber from the calling thread. In this case, the calling thread is Thread-0, because it calls onNext on the downstream subscriber.

    How to separate producer from consumer?

    Use observeOn/ subscribeOn operators in order to handle concurrency in RxJava.

    Should I use low-level Thread constructs ẁith RxJava?

    No you should not use new Thread in order to seperate the producer from the consumer. It is quite easy to break the contract, that onNext can not be called concurrently (interleaving) and therefore breaking the contract. This is why RxJava provides a construct called Scheduler with Workers in order to mitigate such mistakes.

    Note: I think this article describes it quite well: http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html . Please note this is Rx.NET, but the principle is quite the same. If you want to read about concurrency with RxJava you could also look into Davids Blog (https://akarnokd.blogspot.com/2015/05/schedulers-part-1.html) or read this Book (Reactive Programming with RxJava https://www.oreilly.com/library/view/reactive-programming-with/9781491931646/)