I'm trying to implement redux state update pattern using RXJava
val subject=PublishSubject.create()
val subject1=PublishSubject.create()
// multiple threads posting
// on subject and subject1 here. Concurrently
subject.mergeWith(subject1)
.scan(
getInitState(),
{state, event ->
// state update here
}
)
.subscribe({state ->
// use state here
})
As you can see, I'm using scan
operator to maintain the state.
How can I be sure that the state updates happen sequentially even when multiple threads are producing events?
Is there some mechanism in scan
operator which makes the events stand in some queue while waiting for current state update function to finish?
What I have done:
I have successfully implemented this pattern in Android environment. It's really easy because if you always do the state update in
AndroidSchedulers.mainThread()
And make state object immutable you are guaranteed to have atomic and sequential state update. But what happens if you don't have dedicated scheduler for state updates? What if you are not on Android?
What I have researched:
I have read the source code for scan
operator and there is no
waiting "queue" involved. Just simple state update and emission
I have also read SerializedSubject source code. There indeed is a waiting queue which serializes emissions. But what happens if I have two subjects? Serializing both of them doesn't mean that they don't interfere with each other.
To force execution on a single thread, you can explicitly create a single thread scheduler to replace AndroidSchedulers.mainThread()
:
val singleThreadScheduler = Schedulers.single()
Even if the events are emitted on other threads, you can ensure you process them only on your single thread using observeOn
:
subject.mergeWith(subject1)
.observeOn(singleThreadScheduler)
.scan(
getInitState(),
{state, event ->
// state update here
}
)
.subscribe({state ->
// use state here
})
The difference between observeOn
and subscribeOn
can be pretty confusing, and logging the thread id can be useful to check everything is running on the thread you expect.