Search code examples
swiftkotlinrx-javastate-machinerx-swift

How to retain state in Rx without the scan function


I'm working on porting some of my View Models into (rough) Finite State Machines as my UI tends to fit that pattern rather well (Mealy/Moore, don't care for the purpose of this question). Additionally, when done well - state machines really clean up testing - as they prohibit certain test permutations from ever happening.

My current view models use RxSwift (and RxKotlin - depending on the app), and the underlying use cases (database calls, network calls, etc) also use Rx (hence why I need to stay in that ecosystem).

What I've discovered is that Rx is awesome, State Machines are awesome --> Rx + State Machines seem to be a bit of a hash to do anything non-trivial. For example, I know I can use the .scan operator to retain some state, IF my state machine was entirely synchronous (for example, something roughly like this in Swift):

enum Event {
    case event1
    case event2
    case event3
}

enum State {
    case state1
    case state2
    case state3

    func on(event: Event) -> State {
        switch (self, event) {
        case (.state1, .event1):
            // Do something
            return .state2

        case (.state2, .event2):
            // Do something
            return .state3

        default:
            return self // (or nil, or something)
        }
    }
}

func foo() -> Observable<State> {
    let events = Observable<Event>.of(.event1, .event2, .event3)
    return events.scan(State.state1) { (currentState, event) -> State in
        return currentState.on(event)
    }
}

But, what can I do if the return from my State.on function is an Observable (like a network call or something that takes a long time, which is already in Rx)?

enum State {
    case notLoggedIn
    case loggingIn
    case loggedIn
    case error

    func on(event: Event) -> Observable<State> {
        switch (self, event) {
        case (.notLoggedIn, .event1):
            return api.login(credentials)
                .map({ (isLoggedIn) -> State in
                    if isLoggedIn {
                        return .loggedIn
                    }
                    return .error
                })
                .startWith(.loggingIn)

        ... other code ...

        default:
            return self
        }
    }
}

I've tried making the .scan operator take in an Observable accumulator, but the result of this code is that the state machine is subscribed to or run too many times. I guess because it runs on each state in the observable that is accumulating.

return events.scan(Observable.just(State.state1)) { (currentState, event) -> Observable<State> in
    currentState.flatMap({ (innerState) -> Observable<State> in
        return innerState.on(event: event)
    })
}.flatMap { (states) -> Observable<State> in
    return states
}

I think, if I could manage to cleanly pull the state variable back in, the simplest implementation could look like this:

return events.flatMapLatest({ (event) -> Observable<State> in
    return self.state.on(event: event)
        .do(onNext: { (state) in
            self.state = state
        })
})

But, pulling from a private state variable into an observable stream, and updating it - well, not only is it ugly, I feel like I'm just waiting to be hit by a concurrency bug.


Edit: Based on feedback from Sereja Bogolubov - I've added a Relay and come up with this code - still not great, but getting there.

let relay = BehaviorRelay<State>(value: .initial)
...

func transition(from state: State, on event: Event) -> Observable<State> {
    switch (state, event) {
    case (.notLoggedIn, .event1):
        return api.login(credentials)
            .map({ (isLoggedIn) -> State in
                if isLoggedIn {
                    return .loggedIn
                }
                return .error
            })
            .startWith(.loggingIn)

    ... other code ...

    default:
        return self
    }
}

return events.withLatestFrom(relay.asObservable(), resultSelector: { (event, state) -> Observable<State> in
    return self.transition(from: state, on: event)
        .do(onNext: { (state) in
            self.relay.accept(state)
        })
}).flatMap({ (states) -> Observable<State> in
    return states
})

The relay (or replay subject or whatever) is updated in a doOnNext from the result of the state transition... This still feels like it could cause a concurrency problem, but not sure what else would work.


Solution

  • No, you don't have to be entirely sync to maintain arbitrary complex state. Yes, there are ways to achive needed behavior without scan. How about the withLatestFrom, where other is your current state (i.e. a separate Observable<MyState>, but you would need ReplaySubject<MyState> under the hood).

    Let me know if you need more details.


    Proof of concept, javascript:

    const source = range(0, 10);
    const state = new ReplaySubject(1);
    const example = source.pipe(
      withLatestFrom(state), // that's the way you read actual state
      map(([n, currentState]) => {
        state.next(n); // that's the way you change the state
        return ...
      })
    );
    

    Please be aware that more sophisticated cases (like race conditions risky) might require something at least as complex as combineLatest and approp. Scheduler's in place.