I'm trying to figure out a way to eliminate mutable state and therefore possible race condition. But I can't seem to figure out how to somehow "intertwine" two Observables, while also using "scan".
Hopefully by showing more code I can give you the idea:
private val stateRelay: BehaviorRelay<State> = BehaviorRelay.createDefault(initialState ?: DEFAULT_STATE) // maybe this should be `Observable.startWith()` somehow?
fun bindIntents(intents: Observable<Actions>, stateRenderer: StateRenderer) {
compositeDisposable += intents.concatMap { action ->
when (action) {
is Actions.Increment -> {
Observable.create<Change> { emitter ->
// emit things
is Actions.Decrement -> {
Observable.create<Change> { emitter ->
// emit things
}.map { change ->
reducer(stateRelay.value, change) // TODO: figure out how to use scan() here, instead of stateRelay.value! :(
}.subscribeBy { newState ->
stateRelay.accept(newState) // there is a chance that the relay shouldn't be here if scan is used
compositeDisposable +=
stateRelay // TODO: figure out how to use scan() instead of a relay!
.subscribeBy { state ->
fun unbindIntents() {
So I'm receiving a Observable<Actions>
in this method, which is technically a PublishRelay
on the other side (this should be fine).
However, somehow I'm supposed to replace the BehaviorRelay
with Observable.scan()
(possibly with startWith
) to eliminate the mutable state, but I can't seem to wrap my head around what I'm supposed to do for that to happen.
As for the types involved, in case they are needed:
private typealias Reducer = (state: State, change: Change) -> State
private typealias StateRenderer = (state: State) -> Unit
data class State(val count: Int): Parcelable
How could I wrap intents.concatMap.map
, as part of an Observable.scan()
(with possibly startWith()
and replay(1)
), to eliminate my usage of the BehaviorSubject?
I'll elaborate on my comment above. This is a simple rewrite of your code to do what you're asking for.
fun bindIntents(intents: Observable<Actions>, stateRenderer: StateRenderer) {
val stateObservable = intents.concatMap { action ->
when (action) {
is Actions.Increment -> {
Observable.create<Change> { emitter ->
// emit things
is Actions.Decrement -> {
Observable.create<Change> { emitter ->
// emit things
}.scan(initialState, { currentState, change -> reducer(currentState, change)})
compositeDisposable +=
.subscribeBy { state ->
note that this can be simplified further by inlining the observable I assign to stateObservable
in the expression below and using a method reference as the second argument to scan like this
fun bindIntents(intents: Observable<Actions>, stateRenderer: StateRenderer) {
compositeDisposable +=
intents.concatMap { action ->
when (action) {
is Actions.Increment -> {
Observable.create<Change> { emitter ->
// emit things
is Actions.Decrement -> {
Observable.create<Change> { emitter ->
// emit things
}.scan(initialState, this::reducer)
.subscribeBy { state ->