I am not sure about safety of reading/writing instance variables from rxJava chain with different schedulers. There is a small example
public class RxJavaThreadSafety {
private int variable = 0;
// First call
public void doWriting() {
Single.just(255)
.doOnSuccess(
newValue -> variable = newValue
)
.subscribeOn(Schedulers.io())
.subscribe();
}
// Second call
public void doReadingRxChain() {
Single.fromCallable((Callable<Integer>) () -> variable)
.subscribeOn(Schedulers.computation())
.subscribe(
result -> System.out.println(result)
);
}
// Third call
public void doReading() {
System.out.println(variable);
}
}
For simplicity lets assume that these three methods called one after another
My question: Does it thread safe to set variable "in" io scheduler, and lately read this variable "from" computation scheduler or main thread?
I think that is not thread safe, but i want some rxJava and concurrency experts to prove it
No, this is not thread safe.
When you use subscribeOn
it means that calling subscribe()
adds the task for producing the item to the work queue of a scheduler.
The doWriting()
and doReadingRxChain()
methods add tasks to different schedulers. There is no guarantee that the chain in doWriting()
will even start to run before doReadingRxChain()
. This can happen for example if all IO threads are busy.
There is a more fundamental problem: you are writing the value of variable
in one thread and reading it in another. Without any concurrency controls, nothing guarantees that the new value of variable
is seen by the thread reading it. One way to fix that is declaring the variable as volatile
:
private volatile int variable = 0;