Search code examples
kotlinrx-java2behaviorsubject

Building MVI loop with RxJava: how to replace BehaviorSubject with scan()


I'm trying to figure out a way to eliminate mutable state and therefore possible race condition. But I can't seem to figure out how to somehow "intertwine" two Observables, while also using "scan".

Hopefully by showing more code I can give you the idea:

private val stateRelay: BehaviorRelay<State> = BehaviorRelay.createDefault(initialState ?: DEFAULT_STATE) // maybe this should be `Observable.startWith()` somehow?

fun bindIntents(intents: Observable<Actions>, stateRenderer: StateRenderer) {
    compositeDisposable += intents.concatMap { action ->
        when (action) {
            is Actions.Increment -> {
                Observable.create<Change> { emitter -> 
                   // emit things
                }
            }
            is Actions.Decrement -> {
                Observable.create<Change> { emitter -> 
                   // emit things
                }
            }
        }
    }.map { change ->
        reducer(stateRelay.value, change) // TODO: figure out how to use scan() here, instead of stateRelay.value! :(
    }.subscribeBy { newState ->
        stateRelay.accept(newState) // there is a chance that the relay shouldn't be here if scan is used
    }

    compositeDisposable += 
        stateRelay // TODO: figure out how to use scan() instead of a relay!
            .distinctUntilChanged()
            .subscribeBy { state ->
                stateRenderer(state)
            }
}

fun unbindIntents() {
    compositeDisposable.clear()
}

So I'm receiving a Observable<Actions> in this method, which is technically a PublishRelay on the other side (this should be fine).

However, somehow I'm supposed to replace the BehaviorRelay with Observable.scan() (possibly with startWith) to eliminate the mutable state, but I can't seem to wrap my head around what I'm supposed to do for that to happen.

As for the types involved, in case they are needed:

private typealias Reducer = (state: State, change: Change) -> State

private typealias StateRenderer = (state: State) -> Unit

@Parcelize
data class State(val count: Int): Parcelable

How could I wrap intents.concatMap.map, as part of an Observable.scan() (with possibly startWith() and replay(1)), to eliminate my usage of the BehaviorSubject?


Solution

  • I'll elaborate on my comment above. This is a simple rewrite of your code to do what you're asking for.

    fun bindIntents(intents: Observable<Actions>, stateRenderer: StateRenderer) {
        val stateObservable = intents.concatMap { action ->
            when (action) {
                is Actions.Increment -> {
                    Observable.create<Change> { emitter ->
                    // emit things
                    }
                }
                is Actions.Decrement -> {
                    Observable.create<Change> { emitter ->
                        // emit things
                    }
                }
            }
        }.scan(initialState, { currentState, change -> reducer(currentState, change)})
    
        compositeDisposable +=
            stateObservable
                    .distinctUntilChanged()
                    .subscribeBy { state ->
                        stateRenderer(state)
                    }
    }
    

    note that this can be simplified further by inlining the observable I assign to stateObservable in the expression below and using a method reference as the second argument to scan like this

    fun bindIntents(intents: Observable<Actions>, stateRenderer: StateRenderer) {
        compositeDisposable +=
                intents.concatMap { action ->
                    when (action) {
                        is Actions.Increment -> {
                            Observable.create<Change> { emitter ->
                                // emit things
                            }
                        }
                        is Actions.Decrement -> {
                            Observable.create<Change> { emitter ->
                                // emit things
                            }
                        }
                    }
                }.scan(initialState, this::reducer)
                        .distinctUntilChanged()
                        .subscribeBy { state ->
                            stateRenderer(state)
                        }
    }