I've got a function that will return an observable that will emit an enum based on what stage it is currently on. The enum basically goes like this:
enum ViewModelAction {
case inputEnabled(isEnabled: Bool)
case loadingIndicatorShown(isShown: Bool)
case errorMessageShownIfAvailable(error: Error?)
case loadUIData(with entity: Entity)
case startSession(for entity: Entity)
case endSession(for entity: Entity)
case loadEndUIData(with entity: Entity)
}
Now what I want the function do is like so:
If success -> go to 4
If error:
Loop and call API to poll the entity based on the session with the status of READY:
If success:
If error:
So in a way, for a successful run, it will emit this exact sequence:
Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: false)),
.just(ViewModelAction.loadingIndicatorShown(isShown: true)),
.just(ViewModelAction.loadUIData(with: entity)),
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.startSession(for: entity)),
.complete()
)
I have actually done this, but I was thinking that it is not that clear and very complicated.
protocol SessionHandlerProtocol {
func create(for userId: String) -> Observable<SessionData>
func start(session: SessionData, entity: Entity) -> Observable<Void>
}
protocol EntityPollerProtocol {
var entity: Observable<Entity?> { get }
func startPolling(for session: SessionData) //Will poll and emit to entity observable
finc stopPolling()
}
class StartSessionStrategy {
private let sessionHandler: SessionHandlerProtocol
private let entityPoller: EntityPollerProtocol
init(sessionHandler: SessionHandlerProtocol,
entityPoller: EntityPollerProtocol) {
self.sessionHandler = sessionHandler
self.entityPoller = entityPoller
}
func handleSession(with userId: String) -> Observable<ViewModelAction> {
let initialUIObservable =
Observable.from(ViewModelAction.inputEnabled(isEnabled: false),
ViewModelAction.loadingIndicatorShown(isShown: true))
let sharedCreateSession =
sessionHandler.create(for: userID).materialize.share()
let createSessionError =
sharedCreateSession
.flatMapLatest {
switch $0 {
case .error(let error):
return Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.errorMessageShownIfAvailable(error: error)),
.complete()
)
default:
return .never()
}
let createSessionSuccess =
sharedCreateSession
.flatMapLatest {
switch $0 {
case .next(let element):
return .just(element)
default:
return .never()
}
let sharedEntityPoller =
createSessionSuccess
.do(onNext: { [weak self] in self?.entityPoller.startPolling(for: $0) })
.withLatestFrom(entityPoller.entity) { return ($0, $1) }
.materialize()
.share()
let entityPollerError =
sharedEntityPoller
.flatMapLatest {
switch $0 {
case .error(let error):
return .just(ViewModelAction.errorMessageShownIfAvailable(error: error))
default:
return .never()
}
let entityPollerSuccessWithReadyStatus =
sharedEntityPoller
.filter { (_, entity) entity.status = .ready }
.flatMapLatest {
switch $0 {
case .next(let element):
return .just(element)
default:
return .never()
}
.do(onNext: { [weak self] _ in self?.stopPolling() })
let doOnEntityPollerSuccessWithReadyStatus =
entityPollerSuccessWithReadyStatus
.map { return ViewModelAction.loadUIData(with: $0.1) }
let sharedStartSession =
entityPollerSuccessWithReadyStatus
.flatMapLatest { [weak self] (session, entity) in
self?.sessionHandler
.start(session: userID, entity: entity)
.map { return (session, entity) }
}.materialize.share()
let startSessionError =
sharedStartSession
.flatMapLatest {
switch $0 {
case .error(let error):
return Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.errorMessageShownIfAvailable(error: error)),
.complete()
)
default:
return .never()
}
let startSessionSuccess =
sharedStartSession
.flatMapLatest {
switch $0 {
case .next(let element):
return Observable.merge(
.just(ViewModelAction.inputEnabled(isEnabled: true)),
.just(ViewModelAction.loadingIndicatorShown(isShown: false)),
.just(ViewModelAction.startSession(for: element.1)),
.complete
)
default:
return .never()
}
return Observable.merge(
initialUIObservable,
createSessionError,
entityPollerError,
doOnEntityPollerSuccessWithReadyStatus,
startSessionError,
startSessionSuccess
)
}
}
As you can see, the function is pretty big and not that clear. Do you have a suggestion how to refactor this into a cleaner code? Thanks.
If you start with a flow chart, you will end up with an imperative solution. Instead consider each side effect independently. Make each observable sequence a declaration of what causes that side effect.
The essence of your code is this (Note, I wrapped up your entity poller into a more reasonable interface. That wrapper is shown later):
let errors = PublishSubject<Error>()
let session = sessionHandler.create(for: userId)
.catch { errors.onSuccess($0); return .empty() }
.share()
let entity = session
.flatMapLatest { [entityPoller] in
entityPoller($0)
.catch { errors.onSuccess($0); return .empty() }
}
.share()
let started = entity
.compactMap { $0 }
.filter { $0.status == .ready }
.withLatestFrom(session) { ($1, $0) }
.flatMapLatest { [sessionHandler] session, entity in
sessionHandler.start(session: session, entity: entity)
.catch { errors.onSuccess($0); return .empty() }
}
.take(1)
.share()
Everything else is just notifications. I think showing the above, cleanly and without complications, is a huge simplification and will aid in other's understanding your code (including future you.)
Here is what I ended up with, including the wrapper around the poller I mentioned above:
class StartSessionStrategy {
private let sessionHandler: SessionHandlerProtocol
private let entityPoller: (SessionData) -> Observable<Entity?>
init(sessionHandler: SessionHandlerProtocol, poller: EntityPollerProtocol) {
self.sessionHandler = sessionHandler
self.entityPoller = entityPolling(poller)
}
func handleSession(with userId: String) -> Observable<ViewModelAction> {
// the fundamental operations as above:
let errors = PublishSubject<Error>()
let session = sessionHandler.create(for: userId)
.catch { errors.onSuccess($0); return .empty() }
.share()
let entity = session
.flatMapLatest { [entityPoller] in
entityPoller($0)
.catch { errors.onSuccess($0); return .empty() }
}
.share()
let started = entity
.compactMap { $0 }
.filter { $0.status == .ready }
.withLatestFrom(session) { ($1, $0) }
.flatMapLatest { [sessionHandler] session, entity in
sessionHandler.start(session: session, entity: entity)
.catch { errors.onSuccess($0); return .empty() }
}
.take(1)
.share()
// now go through all the notifications:
// input is disabled at start, then enabled once started or if an error occurs
let inputEnabled = Observable.merge(
started,
errors.take(until: started).map(to: ())
)
.map(to: ViewModelAction.inputEnabled(isEnabled: true))
.startWith(ViewModelAction.inputEnabled(isEnabled: false))
.take(2)
// show the loading indicator at start, remove it once started or if an error occurs
let loadingIndicator = Observable.merge(
started,
errors.take(until: started).map(to: ())
)
.map(to: ViewModelAction.loadingIndicatorShown(isShown: false))
.startWith(ViewModelAction.loadingIndicatorShown(isShown: true))
.take(2)
// emit the loadUIData if an entity is ready.
let loadUIData = entity
.compactMap { $0 }
.filter { $0.status == .ready }
.map { ViewModelAction.loadUIData(entity: $0) }
.take(1)
// emit the startSession event once the session starts.
let startSession = started
.withLatestFrom(entity.compactMap { $0 })
.map { entity in ViewModelAction.startSession(entity: entity) }
.take(until: errors)
return Observable.merge(
inputEnabled,
loadingIndicator,
errors
.take(until: started)
.map { ViewModelAction.errorMessageShownIfAvailable(error: $0) }, // emit an error message for all errors.
loadUIData,
startSession
)
}
}
func entityPolling(_ poller: EntityPollerProtocol) -> (SessionData) -> Observable<Entity?> {
{ session in
Observable.create { observer in
let disposable = poller.entity
.subscribe(observer)
poller.startPolling(for: session)
return Disposables.create {
disposable.dispose()
poller.stopPolling()
}
}
}
}
extension ObserverType {
func onSuccess(_ element: Element) -> Void {
onNext(element)
onCompleted()
}
}
extension ObservableType {
func map<T>(to: T) -> Observable<T> {
return map { _ in to }
}
}