I'm working on porting some of my View Models into (rough) Finite State Machines as my UI tends to fit that pattern rather well (Mealy/Moore, don't care for the purpose of this question). Additionally, when done well - state machines really clean up testing - as they prohibit certain test permutations from ever happening.
My current view models use RxSwift (and RxKotlin - depending on the app), and the underlying use cases (database calls, network calls, etc) also use Rx (hence why I need to stay in that ecosystem).
What I've discovered is that Rx is awesome, State Machines are awesome --> Rx + State Machines seem to be a bit of a hash to do anything non-trivial. For example, I know I can use the .scan
operator to retain some state, IF my state machine was entirely synchronous (for example, something roughly like this in Swift):
enum Event {
case event1
case event2
case event3
}
enum State {
case state1
case state2
case state3
func on(event: Event) -> State {
switch (self, event) {
case (.state1, .event1):
// Do something
return .state2
case (.state2, .event2):
// Do something
return .state3
default:
return self // (or nil, or something)
}
}
}
func foo() -> Observable<State> {
let events = Observable<Event>.of(.event1, .event2, .event3)
return events.scan(State.state1) { (currentState, event) -> State in
return currentState.on(event)
}
}
But, what can I do if the return from my State.on
function is an Observable (like a network call or something that takes a long time, which is already in Rx)?
enum State {
case notLoggedIn
case loggingIn
case loggedIn
case error
func on(event: Event) -> Observable<State> {
switch (self, event) {
case (.notLoggedIn, .event1):
return api.login(credentials)
.map({ (isLoggedIn) -> State in
if isLoggedIn {
return .loggedIn
}
return .error
})
.startWith(.loggingIn)
... other code ...
default:
return self
}
}
}
I've tried making the .scan
operator take in an Observable accumulator, but the result of this code is that the state machine is subscribed to or run too many times. I guess because it runs on each state in the observable that is accumulating.
return events.scan(Observable.just(State.state1)) { (currentState, event) -> Observable<State> in
currentState.flatMap({ (innerState) -> Observable<State> in
return innerState.on(event: event)
})
}.flatMap { (states) -> Observable<State> in
return states
}
I think, if I could manage to cleanly pull the state
variable back in, the simplest implementation could look like this:
return events.flatMapLatest({ (event) -> Observable<State> in
return self.state.on(event: event)
.do(onNext: { (state) in
self.state = state
})
})
But, pulling from a private state variable into an observable stream, and updating it - well, not only is it ugly, I feel like I'm just waiting to be hit by a concurrency bug.
Edit: Based on feedback from Sereja Bogolubov - I've added a Relay and come up with this code - still not great, but getting there.
let relay = BehaviorRelay<State>(value: .initial)
...
func transition(from state: State, on event: Event) -> Observable<State> {
switch (state, event) {
case (.notLoggedIn, .event1):
return api.login(credentials)
.map({ (isLoggedIn) -> State in
if isLoggedIn {
return .loggedIn
}
return .error
})
.startWith(.loggingIn)
... other code ...
default:
return self
}
}
return events.withLatestFrom(relay.asObservable(), resultSelector: { (event, state) -> Observable<State> in
return self.transition(from: state, on: event)
.do(onNext: { (state) in
self.relay.accept(state)
})
}).flatMap({ (states) -> Observable<State> in
return states
})
The relay (or replay subject or whatever) is updated in a doOnNext
from the result of the state transition... This still feels like it could cause a concurrency problem, but not sure what else would work.
No, you don't have to be entirely sync to maintain arbitrary complex state.
Yes, there are ways to achive needed behavior without scan
. How about the withLatestFrom, where other
is your current state (i.e. a separate Observable<MyState>
, but you would need ReplaySubject<MyState>
under the hood).
Let me know if you need more details.
Proof of concept, javascript:
const source = range(0, 10);
const state = new ReplaySubject(1);
const example = source.pipe(
withLatestFrom(state), // that's the way you read actual state
map(([n, currentState]) => {
state.next(n); // that's the way you change the state
return ...
})
);
Please be aware that more sophisticated cases (like race conditions risky) might require something at least as complex as combineLatest and approp. Scheduler
's in place.