Currently I have services that manages retrieving data from the local storage, but also checks the remote network for any modified data. It is using a completion handler with Result
pattern and protocol type, but would like to convert this to an observable approach.
Here is the current logic:
struct AuthorWorker: AuthorWorkerType, Loggable {
private let store: AuthorStore
private let remote: AuthorRemote
init(store: AuthorStore, remote: AuthorRemote) {
self.store = store
self.remote = remote
}
}
extension AuthorWorker {
func fetch(id: Int, completion: @escaping (Result<AuthorType, DataError>) -> Void) {
store.fetch(id: id) {
// Immediately return local response
completion($0)
guard case .success(let cacheElement) = $0 else { return }
// Sync remote updates to cache if applicable
remote.fetch(id: id) {
// Validate if any updates occurred and return
guard case .success(let element) = $0,
element.modifiedAt > cacheElement.modifiedAt else {
return
}
// Update local storage with updated data
self.store.createOrUpdate(element) {
guard case .success = $0 else { return }
// Callback handler again if updated
completion($0)
}
}
}
}
}
I'm always instantly returning the local data to the UI so the user doesn't wait. In the background, it is checking the remote network for modified data and updates the UI again only if necessary. I use it like this:
authorWorker.fetch(1) { [weak self] in
guard case .success(let value) = $0 else {
// alert error
}
self?.myLabel.text = value.name
}
How can this be converted to RxSwift or an observable concept? This is what I got started, but I don't see the code on the walls like Neo yet when it comes to Rx, so I need help seeing the light.
extension AuthorWorker {
func fetch(id: Int) -> Observable<AuthorType> {
return Observable<AuthorType>.create { observer in
store.fetch(id: id) {
// Immediately return local response
observer.on(.next($0))
guard case .success(let cacheElement) = $0 else {
observer.on(.completed)
return
}
// Sync remote updates to cache if applicable
remote.fetch(id: id) {
// Validate if any updates occurred and return
guard case .success(let element) = $0,
element.modifiedAt > cacheElement.modifiedAt else {
observer.on(.completed)
return
}
// Update local storage with updated data
self.store.createOrUpdate(element) {
guard case .success = $0 else {
observer.on(.completed)
return
}
// Callback handler again if updated
observer.on(.next($0))
observer.on(.completed)
}
}
}
}
}
}
Then I would use it like this?
authorWorker.fetch(1).subscribe { [weak self] in
guard let element = $0.element else {
// Handle error how?
return
}
self?.myLabel.text = element.name
}
Is this the right approach or is there a more recommended way to do this? Is it also worth converting the underlying remote and local stores to observable as well, or does it make sense not to convert all things to observable all the time?
Based on the comment, I see that you want something much more elaborate than my first answer, so here you go.
func worker<T: Equatable>(store: Observable<T>, remote: Observable<T>) -> (value: Observable<T>, store: Observable<T>) {
let sharedStore = store.share(replay: 1)
let sharedRemote = remote.share(replay: 1)
let value = Observable.merge(sharedStore, sharedRemote)
.distinctUntilChanged()
.takeUntil(sharedRemote.materialize().filter { $0.isStopEvent })
let store = Observable.zip(sharedStore, sharedRemote)
.filter { $0.0 != $0.1 }
.map { $0.1 }
return (value: value, store: store)
}
Here is the code above being used in your AuthorWorker class:
extension AuthorWorker {
func fetch(id: Int) -> Observable<AuthorType> {
let (_value, _store) = worker(store: store.fetch(id: id), remote: remote.fetch(id: id))
_ = _store
.subscribe(onNext: store.createOrUpdate)
return _value
}
}
And here is a test suite proving it works properly:
class Tests: XCTestCase {
var scheduler: TestScheduler!
var emission: TestableObserver<String>!
var storage: TestableObserver<String>!
var disposeBag: DisposeBag!
override func setUp() {
super.setUp()
scheduler = TestScheduler(initialClock: 0)
emission = scheduler.createObserver(String.self)
storage = scheduler.createObserver(String.self)
disposeBag = DisposeBag()
}
func testHappyPath() {
let storeProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
let remoteProducer = scheduler.createColdObservable([.next(20, "remote"), .completed(21)])
let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
disposeBag.insert(
value.subscribe(emission),
store.subscribe(storage)
)
scheduler.start()
XCTAssertEqual(emission.events, [.next(10, "store"), .next(20, "remote"), .completed(21)])
XCTAssertEqual(storage.events, [.next(20, "remote"), .completed(21)])
}
func testSameValue() {
let storeProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
let remoteProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
disposeBag.insert(
value.subscribe(emission),
store.subscribe(storage)
)
scheduler.start()
XCTAssertEqual(emission.events, [.next(10, "store"), .completed(21)])
XCTAssertEqual(storage.events, [.completed(21)])
}
func testRemoteFirst() {
let storeProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
let remoteProducer = scheduler.createColdObservable([.next(10, "remote"), .completed(11)])
let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
disposeBag.insert(
value.subscribe(emission),
store.subscribe(storage)
)
scheduler.start()
XCTAssertEqual(emission.events, [.next(10, "remote"), .completed(11)])
XCTAssertEqual(storage.events, [.next(20, "remote"), .completed(21)])
}
func testRemoteFirstSameValue() {
let storeProducer = scheduler.createColdObservable([.next(20, "store"), .completed(21)])
let remoteProducer = scheduler.createColdObservable([.next(10, "store"), .completed(11)])
let (value, store) = worker(store: storeProducer.asObservable(), remote: remoteProducer.asObservable())
disposeBag.insert(
value.subscribe(emission),
store.subscribe(storage)
)
scheduler.start()
XCTAssertEqual(emission.events, [.next(10, "store"), .completed(11)])
XCTAssertEqual(storage.events, [.completed(21)])
}
}
I'd be inclined to aim for a usage like this:
let result = authorWorker.fetch(id: 1)
.share()
result
.map { $0.description }
.catchErrorJustReturn("")
.bind(to: myLabel.rx.text)
.disposed(by: disposeBag)
result
.subscribe(onError: { error in
// handle error here
})
.disposed(by: disposeBag)
The above can be accomplished if you have something like the below for example:
extension AuthorWorker {
func fetch(id: Int) -> Observable<AuthorType> {
return Observable.merge(store.fetch(id: id), remote.fetch(id: id))
.distinctUntilChanged()
}
}
extension AuthorStore {
func fetch(id: Int) -> Observable<AuthorType> {
return Observable.create { observer in
self.fetch(id: id, completion: { result in
switch result {
case .success(let value):
observer.onNext(value)
observer.onCompleted()
case .failure(let error):
observer.onError(error)
}
})
return Disposables.create()
}
}
}
extension AuthorRemote {
func fetch(id: Int) -> Observable<AuthorType> {
return Observable.create { observer in
self.fetch(id: id, completion: { result in
switch result {
case .success(let value):
observer.onNext(value)
observer.onCompleted()
case .failure(let error):
observer.onError(error)
}
})
return Disposables.create()
}
}
}