Search code examples
iosrx-swift

How to create a function that returns an observable that will emit multiple times?


I've got a function that will return an observable that will emit an enum based on what stage it is currently on. The enum basically goes like this:

enum ViewModelAction {
    case inputEnabled(isEnabled: Bool)
    case loadingIndicatorShown(isShown: Bool)
    case errorMessageShownIfAvailable(error: Error?)
    case loadUIData(with entity: Entity)
    case startSession(for entity: Entity)
    case endSession(for entity: Entity)
    case loadEndUIData(with entity: Entity)
}

Now what I want the function do is like so:

  1. Function start with input params -> go to 2
  2. Do:
    • Emit disable input from UI
    • Emit show loading indicator
    • Call API to create session based on input params -> API returns session data -> go to 3
  3. Do:
    • If success -> go to 4

    • If error:

      • Emit enable input from UI
      • Emit hide loading indicator
      • Emit show error with the error -> Complete stream
  4. Do:
    • Loop and call API to poll the entity based on the session with the status of READY:

      • If success with READY status -> Emit load UI data with entity -> go to 5
      • If success with non READY status -> loop again / back to 4
      • If error -> Emit show error with the error -> Loop again / back to 4
  5. Do:
    • Call API to start session using the entity -> API returns session data -> go to 6
  6. Do:
    • If success:

      • Emit enable input from UI
      • Emit hide loading indicator
      • Emit start session for entity -> Complete stream
    • If error:

      • Emit enable input from UI
      • Emit hide loading indicator
      • Emit show error with the error -> Complete stream

So in a way, for a successful run, it will emit this exact sequence:

Observable.merge(
    .just(ViewModelAction.inputEnabled(isEnabled: false)),
    .just(ViewModelAction.loadingIndicatorShown(isShown: true)),
    .just(ViewModelAction.loadUIData(with: entity)),
    .just(ViewModelAction.inputEnabled(isEnabled: true)),
    .just(ViewModelAction.loadingIndicatorShown(isShown: false)),
    .just(ViewModelAction.startSession(for: entity)),
    .complete()
)

I have actually done this, but I was thinking that it is not that clear and very complicated.

protocol SessionHandlerProtocol {
    func create(for userId: String) -> Observable<SessionData>
    func start(session: SessionData, entity: Entity) -> Observable<Void>
}

protocol EntityPollerProtocol {
    var entity: Observable<Entity?> { get }
    func startPolling(for session: SessionData) //Will poll and emit to entity observable
    finc stopPolling()
}

class StartSessionStrategy {
    private let sessionHandler: SessionHandlerProtocol
    private let entityPoller: EntityPollerProtocol
    init(sessionHandler: SessionHandlerProtocol,
         entityPoller: EntityPollerProtocol) {

        self.sessionHandler = sessionHandler
        self.entityPoller = entityPoller
    }

    func handleSession(with userId: String) -> Observable<ViewModelAction> {
        let initialUIObservable = 
            Observable.from(ViewModelAction.inputEnabled(isEnabled: false),
                            ViewModelAction.loadingIndicatorShown(isShown: true))

        let sharedCreateSession = 
            sessionHandler.create(for: userID).materialize.share()

        let createSessionError =
            sharedCreateSession
                .flatMapLatest {
                    switch $0 {
                    case .error(let error):
                        return Observable.merge(
                            .just(ViewModelAction.inputEnabled(isEnabled: true)),
                            .just(ViewModelAction.loadingIndicatorShown(isShown: false)),
                            .just(ViewModelAction.errorMessageShownIfAvailable(error: error)),
                            .complete()
                        )
                    default:
                        return .never()
                }

        let createSessionSuccess = 
            sharedCreateSession
                .flatMapLatest {
                    switch $0 {
                    case .next(let element):
                        return .just(element)
                    default:
                        return .never()
                }

        let sharedEntityPoller = 
            createSessionSuccess
                .do(onNext: { [weak self] in self?.entityPoller.startPolling(for: $0) })
                .withLatestFrom(entityPoller.entity) { return ($0, $1) }
                .materialize()
                .share()

        let entityPollerError =
            sharedEntityPoller
                .flatMapLatest {
                    switch $0 {
                    case .error(let error):
                        return .just(ViewModelAction.errorMessageShownIfAvailable(error: error))
                    default:
                        return .never()
                }

        let entityPollerSuccessWithReadyStatus = 
            sharedEntityPoller
                .filter { (_, entity) entity.status = .ready }
                .flatMapLatest {
                    switch $0 {
                    case .next(let element):
                        return .just(element)
                    default:
                        return .never()
                }
                .do(onNext: { [weak self] _ in self?.stopPolling() })

        let doOnEntityPollerSuccessWithReadyStatus = 
                entityPollerSuccessWithReadyStatus
                    .map { return ViewModelAction.loadUIData(with: $0.1) }

        let sharedStartSession = 
            entityPollerSuccessWithReadyStatus
                .flatMapLatest { [weak self] (session, entity) in
                    self?.sessionHandler
                        .start(session: userID, entity: entity)
                        .map { return (session, entity) }
                }.materialize.share()

        let startSessionError =
            sharedStartSession
                .flatMapLatest {
                    switch $0 {
                    case .error(let error):
                        return Observable.merge(
                            .just(ViewModelAction.inputEnabled(isEnabled: true)),
                            .just(ViewModelAction.loadingIndicatorShown(isShown: false)),
                            .just(ViewModelAction.errorMessageShownIfAvailable(error: error)),
                            .complete()
                        )
                    default:
                        return .never()
                }

        let startSessionSuccess = 
            sharedStartSession
                .flatMapLatest {
                    switch $0 {
                    case .next(let element):
                        return Observable.merge(
                            .just(ViewModelAction.inputEnabled(isEnabled: true)),
                            .just(ViewModelAction.loadingIndicatorShown(isShown: false)),
                            .just(ViewModelAction.startSession(for: element.1)),
                            .complete
                        )
                    default:
                        return .never()
                }

        return Observable.merge(
            initialUIObservable,
            createSessionError,
            entityPollerError,
            doOnEntityPollerSuccessWithReadyStatus,
            startSessionError,
            startSessionSuccess
        )
    }
}

As you can see, the function is pretty big and not that clear. Do you have a suggestion how to refactor this into a cleaner code? Thanks.


Solution

  • If you start with a flow chart, you will end up with an imperative solution. Instead consider each side effect independently. Make each observable sequence a declaration of what causes that side effect.

    The essence of your code is this (Note, I wrapped up your entity poller into a more reasonable interface. That wrapper is shown later):

    let errors = PublishSubject<Error>()
    let session = sessionHandler.create(for: userId)
        .catch { errors.onSuccess($0); return .empty() }
        .share()
    
    let entity = session
        .flatMapLatest { [entityPoller] in
            entityPoller($0)
                .catch { errors.onSuccess($0); return .empty() }
        }
        .share()
    
    let started = entity
        .compactMap { $0 }
        .filter { $0.status == .ready }
        .withLatestFrom(session) { ($1, $0) }
        .flatMapLatest { [sessionHandler] session, entity in
            sessionHandler.start(session: session, entity: entity)
                .catch { errors.onSuccess($0); return .empty() }
        }
        .take(1)
        .share()
    

    Everything else is just notifications. I think showing the above, cleanly and without complications, is a huge simplification and will aid in other's understanding your code (including future you.)

    Here is what I ended up with, including the wrapper around the poller I mentioned above:

    class StartSessionStrategy {
        private let sessionHandler: SessionHandlerProtocol
        private let entityPoller: (SessionData) -> Observable<Entity?>
    
        init(sessionHandler: SessionHandlerProtocol, poller: EntityPollerProtocol) {
            self.sessionHandler = sessionHandler
            self.entityPoller = entityPolling(poller)
        }
    
        func handleSession(with userId: String) -> Observable<ViewModelAction> {
            // the fundamental operations as above:
            let errors = PublishSubject<Error>()
            let session = sessionHandler.create(for: userId)
                .catch { errors.onSuccess($0); return .empty() }
                .share()
    
            let entity = session
                .flatMapLatest { [entityPoller] in
                    entityPoller($0)
                        .catch { errors.onSuccess($0); return .empty() }
                }
                .share()
    
            let started = entity
                .compactMap { $0 }
                .filter { $0.status == .ready }
                .withLatestFrom(session) { ($1, $0) }
                .flatMapLatest { [sessionHandler] session, entity in
                    sessionHandler.start(session: session, entity: entity)
                        .catch { errors.onSuccess($0); return .empty() }
                }
                .take(1)
                .share()
    
            // now go through all the notifications:
    
            // input is disabled at start, then enabled once started or if an error occurs
            let inputEnabled = Observable.merge(
                started,
                errors.take(until: started).map(to: ())
            )
            .map(to: ViewModelAction.inputEnabled(isEnabled: true))
            .startWith(ViewModelAction.inputEnabled(isEnabled: false))
            .take(2)
    
            // show the loading indicator at start, remove it once started or if an error occurs
            let loadingIndicator = Observable.merge(
                started,
                errors.take(until: started).map(to: ())
            )
            .map(to: ViewModelAction.loadingIndicatorShown(isShown: false))
            .startWith(ViewModelAction.loadingIndicatorShown(isShown: true))
            .take(2)
    
            // emit the loadUIData if an entity is ready.
            let loadUIData = entity
                .compactMap { $0 }
                .filter { $0.status == .ready }
                .map { ViewModelAction.loadUIData(entity: $0) }
                .take(1)
    
            // emit the startSession event once the session starts.
            let startSession = started
                .withLatestFrom(entity.compactMap { $0 })
                .map { entity in ViewModelAction.startSession(entity: entity) }
                .take(until: errors)
    
            return Observable.merge(
                inputEnabled,
                loadingIndicator,
                errors
                    .take(until: started)
                    .map { ViewModelAction.errorMessageShownIfAvailable(error: $0) }, // emit an error message for all errors.
                loadUIData,
                startSession
            )
        }
    }
    
    func entityPolling(_ poller: EntityPollerProtocol) -> (SessionData) -> Observable<Entity?> {
        { session in
            Observable.create { observer in
                let disposable = poller.entity
                    .subscribe(observer)
                poller.startPolling(for: session)
                return Disposables.create {
                    disposable.dispose()
                    poller.stopPolling()
                }
            }
        }
    }
    
    extension ObserverType {
        func onSuccess(_ element: Element) -> Void {
            onNext(element)
            onCompleted()
        }
    }
    
    extension ObservableType {
        func map<T>(to: T) -> Observable<T> {
            return map { _ in to }
        }
    }