I have following code:
private static void log(Object msg) {
System.out.println(
Thread.currentThread().getName() +
": " + msg);
}
Observable<Integer> naturalNumbers = Observable.create(emitter -> {
log("Invoked"); // on main thread
Runnable r = () -> {
log("Invoked on another thread");
int i = 0;
while(!emitter.isDisposed()) {
log("Emitting "+ i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received "+i));
So here we have 2 important lambda expressions. First is the one we pass to Observable.create, second is the callback one we pass to Observable.subscribe(). In first lambda, we create a new thread and then emit values on that thread. In second lambda, we have the code to receive those values emitted in first lambda code. I observe that both code are executed on same thread.
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
Thread-0: Received 1
Thread-0: Emitting 2
Thread-0: Received 2
Why is it so? Does RxJava by default run code emitting values(observable) and the code receiving values(observer) on same thread?
Let's see, what happens, if you use a Thread
to execute a runnable:
@Test
void threadTest() throws Exception {
log("main");
CountDownLatch countDownLatch = new CountDownLatch(1);
new Thread(
() -> {
log("thread");
countDownLatch.countDown();
})
.start();
countDownLatch.await();
}
main: main
Thread-0: thread
It seems, that the main entry point is called from main
thread and the newly created Thread
is called Thread-0
.
Why is it so? Does RxJava by default run code emitting values(observable) and the code receiving values(observer) on same thread?
By default RxJava
is single-threaded. Therefore the the producer, if not definied differently by observeOn
, subscribeOn
or different threading layout, will emit values on the consumer
(subsriber)-thread. This is because RxJava
runs everything on the subscribing stack by default.
@Test
void fdskfkjsj() throws Exception {
log("main");
Observable<Integer> naturalNumbers =
Observable.create(
emitter -> {
log("Invoked"); // on main thread
Runnable r =
() -> {
log("Invoked on another thread");
int i = 0;
while (!emitter.isDisposed()) {
log("Emitting " + i);
emitter.onNext(i);
i += 1;
}
};
new Thread(r).start();
});
Disposable disposable = naturalNumbers.subscribe(i -> log("Received " + i));
Thread.sleep(100);
}
main: main
main: Invoked
Thread-0: Invoked on another thread
Thread-0: Emitting 0
Thread-0: Received 0
Thread-0: Emitting 1
In your example it is apparent, that the main method is called from the main thread. Furthermore the subscribeActual
call is also run on the calling-thread (main
). But the Observable#create
lambda calls onNext
from the newly created thread Thread-0
. The value is pushed to the subscriber from the calling thread. In this case, the calling thread is Thread-0
, because it calls onNext
on the downstream subscriber.
Use observeOn
/ subscribeOn
operators in order to handle concurrency in RxJava
.
No you should not use new Thread
in order to seperate the producer from the consumer. It is quite easy to break the contract, that onNext
can not be called concurrently (interleaving) and therefore breaking the contract. This is why RxJava
provides a construct called Scheduler
with Worker
s in order to mitigate such mistakes.
Note:
I think this article describes it quite well: http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html . Please note this is Rx.NET, but the principle is quite the same. If you want to read about concurrency with RxJava
you could also look into Davids Blog (https://akarnokd.blogspot.com/2015/05/schedulers-part-1.html) or read this Book (Reactive Programming with RxJava https://www.oreilly.com/library/view/reactive-programming-with/9781491931646/)