Search code examples
rx-swift

RXSwift: Catch an error without emiting an element from the stream


Ultimately I need a way to catch RXSwift errors without emitting another item to the stream, this doesn't seem possible with the current RX operators.

Here's how my application works now:

I have 3 layers:

  • Service layer
  • Local data layer
  • Remote data layer

The service layer acts as a bridge between the local and remote layers and is called directly by UI code.

For each function provided in the service layer e.g. getData() -> Observable<String> I need to do the following:

  • Get the local data & start a ticker counting from 0 upwards every second
  • Get the data from the remote service
  • The ticker should only complete when the remote service completes
  • If the local service returns an error, it must be ignored and not emitted
  • When the remote service completes, update the local data with the new data, if any

I've gotten this far in a playground:

The local data service:

let localService = Observable<String>.create { (observer) -> Disposable in
    observer.onNext("Local result")
    observer.onCompleted()
    return Disposables.create()
}.observeOn(MainScheduler.instance)

The remote data service:

let remoteService = Observable<String>.create { (observer) -> Disposable in
    observer.onNext("Remote result")
    observer.onCompleted()
    return Disposables.create()
}.observeOn(MainScheduler.instance).delay(5, scheduler: MainScheduler.instance)

The database updater service:

let databaseUpdaterService = Observable<Void>.empty().observeOn(MainScheduler.instance).delay(2, scheduler: MainScheduler.instance)
let remoteAndDatabaseService = remoteService.do(onNext: { (value) in
    print("Database updating")
    _ = databaseUpdaterService.subscribe(onCompleted: {
        print("Database updated")
    })
})

The time ticker:

let ticker = Observable<Int>.interval(1, scheduler: MainScheduler.instance).do(onNext: { (tick) in
    print("Ticker event: \(tick) seconds")
}).takeUntil(remoteService)

The merged service:

let mergedService = Observable<String>.merge([localService, remoteAndDatabaseService])

Starting the ticker:

print("Ticker start")
_ = ticker.subscribe(onError: { (error) in
    print("Ticker error")
}, onCompleted: {
    print("Ticker completed")
}, onDisposed: {
    print("Ticker disposed")
})

Starting the service:

print("Service start")
_ = mergedService.subscribe(onNext: { (value) in
    print("Service value: \(value)")
}, onError: { (error) in
    print(error.localizedDescription)
}, onCompleted: {
    print("Service completed")
}, onDisposed: {
    print("Service disposed")
})

For the happy path, this works perfectly.

However, in the case where the localService calls an onError (if the local object doesn't exist for example) the merged stream terminates.

I would like for the error to be swallowed up and for nothing to be emmitted by the stream for the localService.

Currently the catch operators force you to emit another object instead of failing silently.

Thanks in advance for the help.


Solution

  • If you don't want to bring in the whole RxSwiftExt library for just one thing you can use .catchError { _ in Observable.empty() } which will eat the error and emit a complete event.

    Another option is .catchError { _ in Observable.never() } which will eat the error and then emit nothing.