Search code examples
swiftobservablereactive-programmingrx-swift

RxSwift to juggle local database and remote network?


Currently I have services that manages retrieving data from the local storage, but also checks the remote network for any modified data. It is using a completion handler with Result pattern and protocol type, but would like to convert this to an observable approach.

Here is the current logic:

struct AuthorWorker: AuthorWorkerType, Loggable {
    private let store: AuthorStore
    private let remote: AuthorRemote

    init(store: AuthorStore, remote: AuthorRemote) {
        self.store = store
        self.remote = remote
    }
}

extension AuthorWorker {

    func fetch(id: Int, completion: @escaping (Result<AuthorType, DataError>) -> Void) {
        store.fetch(id: id) {
            // Immediately return local response
            completion($0)

            guard case .success(let cacheElement) = $0 else { return }

            // Sync remote updates to cache if applicable
            remote.fetch(id: id) {
                // Validate if any updates occurred and return
                guard case .success(let element) = $0,
                    element.modifiedAt > cacheElement.modifiedAt else {
                        return
                }

                // Update local storage with updated data
                self.store.createOrUpdate(element) {
                    guard case .success = $0 else { return }

                    // Callback handler again if updated
                    completion($0)
                }
            }
        }
    }
}

I'm always instantly returning the local data to the UI so the user doesn't wait. In the background, it is checking the remote network for modified data and updates the UI again only if necessary. I use it like this:

authorWorker.fetch(1) { [weak self] in
    guard case .success(let value) = $0 else {
        // alert error
    }

    self?.myLabel.text = value.name
}

How can this be converted to RxSwift or an observable concept? This is what I got started, but I don't see the code on the walls like Neo yet when it comes to Rx, so I need help seeing the light.

extension AuthorWorker {

    func fetch(id: Int) -> Observable<AuthorType> {
        return Observable<AuthorType>.create { observer in
            store.fetch(id: id) {
                // Immediately return local response
                observer.on(.next($0))

                guard case .success(let cacheElement) = $0 else {
                    observer.on(.completed)
                    return
                }

                // Sync remote updates to cache if applicable
                remote.fetch(id: id) {
                    // Validate if any updates occurred and return
                    guard case .success(let element) = $0,
                        element.modifiedAt > cacheElement.modifiedAt else {
                            observer.on(.completed)
                            return
                    }

                    // Update local storage with updated data
                    self.store.createOrUpdate(element) {
                        guard case .success = $0 else {
                            observer.on(.completed)
                            return
                        }

                        // Callback handler again if updated
                        observer.on(.next($0))
                        observer.on(.completed)
                    }
                }
            }
        }
    }
}

Then I would use it like this?

authorWorker.fetch(1).subscribe { [weak self] in
    guard let element = $0.element else {
        // Handle error how?
        return
    }

    self?.myLabel.text = element.name
}

Is this the right approach or is there a more recommended way to do this? Is it also worth converting the underlying remote and local stores to observable as well, or does it make sense not to convert all things to observable all the time?


Solution

  • New Answer

    Based on the comment, I see that you want something much more elaborate than my first answer, so here you go.

    func worker<T: Equatable>(store: Observable<T>, remote: Observable<T>) -> (value: Observable<T>, store: Observable<T>) {
        let sharedStore = store.share(replay: 1)
        let sharedRemote = remote.share(replay: 1)
        let value = Observable.merge(sharedStore, sharedRemote)
            .distinctUntilChanged()
            .takeUntil(sharedRemote.materialize().filter { $0.isStopEvent })
        let store = Observable.zip(sharedStore, sharedRemote)
            .filter { $0.0 != $0.1 }
            .map { $0.1 }
    
        return (value: value, store:  store)
    }
    

    Here is the code above being used in your AuthorWorker class:

    extension AuthorWorker {
        func fetch(id: Int) -> Observable<AuthorType> {
            let (_value, _store) = worker(store: store.fetch(id: id), remote: remote.fetch(id: id))
    
            _ = _store
                .subscribe(onNext: store.createOrUpdate)
    
            return _value
        }
    }
    

    And here is a test suite proving it works properly:

    class Tests: XCTestCase {
    
        var scheduler: TestScheduler!
        var emission: TestableObserver<String>!
        var storage: TestableObserver<String>!
        var disposeBag: DisposeBag!
    
        override func setUp() {
            super.setUp()
            scheduler = TestScheduler(initialClock: 0)
            emission = scheduler.createObserver(String.self)
            storage = scheduler.createObserver(String.self)
            disposeBag = DisposeBag()
        }
    
        func testHappyPath() {
            let storeProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
            let remoteProducer = scheduler.createColdObservable([.next(20, "remote"), .completed(21)])
    
            let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
    
            disposeBag.insert(
                value.subscribe(emission),
                store.subscribe(storage)
            )
    
            scheduler.start()
    
            XCTAssertEqual(emission.events, [.next(10, "store"), .next(20, "remote"), .completed(21)])
            XCTAssertEqual(storage.events, [.next(20, "remote"), .completed(21)])
        }
    
        func testSameValue() {
            let storeProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
            let remoteProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
    
            let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
    
            disposeBag.insert(
                value.subscribe(emission),
                store.subscribe(storage)
            )
    
            scheduler.start()
    
            XCTAssertEqual(emission.events, [.next(10, "store"), .completed(21)])
            XCTAssertEqual(storage.events, [.completed(21)])
        }
    
        func testRemoteFirst() {
            let storeProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
            let remoteProducer = scheduler.createColdObservable([.next(10, "remote"), .completed(11)])
    
            let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
    
            disposeBag.insert(
                value.subscribe(emission),
                store.subscribe(storage)
            )
    
            scheduler.start()
    
            XCTAssertEqual(emission.events, [.next(10, "remote"), .completed(11)])
            XCTAssertEqual(storage.events, [.next(20, "remote"), .completed(21)])
        }
    
        func testRemoteFirstSameValue() {
            let storeProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
            let remoteProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
    
            let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
    
            disposeBag.insert(
                value.subscribe(emission),
                store.subscribe(storage)
            )
    
            scheduler.start()
    
            XCTAssertEqual(emission.events, [.next(10, "store"), .completed(11)])
            XCTAssertEqual(storage.events, [.completed(21)])
        }
    }
    

    Previous Answer

    I'd be inclined to aim for a usage like this:

    let result = authorWorker.fetch(id: 1)
        .share()
    
    result
        .map { $0.description }
        .catchErrorJustReturn("")
        .bind(to: myLabel.rx.text)
        .disposed(by: disposeBag)
    
    result
        .subscribe(onError: { error in
            // handle error here
        })
        .disposed(by: disposeBag)
    

    The above can be accomplished if you have something like the below for example:

    extension AuthorWorker {
    
        func fetch(id: Int) -> Observable<AuthorType> {
            return Observable.merge(store.fetch(id: id), remote.fetch(id: id))
                .distinctUntilChanged()
        }
    }
    
    extension AuthorStore {
        func fetch(id: Int) -> Observable<AuthorType> {
            return Observable.create { observer in
                self.fetch(id: id, completion: { result in
                    switch result {
                    case .success(let value):
                        observer.onNext(value)
                        observer.onCompleted()
                    case .failure(let error):
                        observer.onError(error)
                    }
                })
                return Disposables.create()
            }
        }
    }
    
    extension AuthorRemote {
        func fetch(id: Int) -> Observable<AuthorType> {
            return Observable.create { observer in
                self.fetch(id: id, completion: { result in
                    switch result {
                    case .success(let value):
                        observer.onNext(value)
                        observer.onCompleted()
                    case .failure(let error):
                        observer.onError(error)
                    }
                })
                return Disposables.create()
            }
        }
    }