Search code examples
javamultithreadingconcurrencyrx-javarx-java2

Setting and reading instance variable value within rxJava chain from different Schedulers


I am not sure about safety of reading/writing instance variables from rxJava chain with different schedulers. There is a small example


public class RxJavaThreadSafety {

    private int variable = 0;

    // First call 
    public void doWriting() {
        Single.just(255)
                .doOnSuccess(
                        newValue -> variable = newValue
                )
                .subscribeOn(Schedulers.io())
                .subscribe();
    }

    // Second call
    public void doReadingRxChain() {
        Single.fromCallable((Callable<Integer>) () -> variable)
                .subscribeOn(Schedulers.computation())
                .subscribe(
                        result -> System.out.println(result)
                );
    }

    // Third call
    public void doReading() {
        System.out.println(variable);
    }

}

For simplicity lets assume that these three methods called one after another

My question: Does it thread safe to set variable "in" io scheduler, and lately read this variable "from" computation scheduler or main thread?

I think that is not thread safe, but i want some rxJava and concurrency experts to prove it


Solution

  • No, this is not thread safe.

    When you use subscribeOn it means that calling subscribe() adds the task for producing the item to the work queue of a scheduler.

    The doWriting() and doReadingRxChain() methods add tasks to different schedulers. There is no guarantee that the chain in doWriting() will even start to run before doReadingRxChain(). This can happen for example if all IO threads are busy.

    There is a more fundamental problem: you are writing the value of variable in one thread and reading it in another. Without any concurrency controls, nothing guarantees that the new value of variable is seen by the thread reading it. One way to fix that is declaring the variable as volatile:

    private volatile int variable = 0;