Search code examples
javaandroidkotlinrx-javarx-android

Fetching data from local and remote simultaneously using RxJava


So I'm a beginner with RxJava but here's what I want to accomplish:

MainViewModel talks to the Repository. Repository has both LocalDataStore (that talks with the database) and RemoteDataStore (Retrofit) Both are different implementations of interface DataStore).

What I want to achieve is have a single call fetchData from the Repository that returns an Observable but:

  • it takes it from the RemoteDataStore at first
  • after fetching every single thing (onNext()), it inserts it into the database
  • if it fails, it returns results from the LocalDataStore.

However, I don't know how to implement this logic. Subscription happens on the ViewModel's end, but I cannot really change the observable to LocalDataStore from Repository end (?). Upserting data into the database also returns an Observable (Single to be precise) and for it to work it needs a subscription.

Could someone explain it to me or point me in a good direction?

My code (problem in repository comments):

Remote data store

 override fun getData(): Observable<SomeData> = api
    .getData(token)
    .flatMapIterable { x -> x }

Local data store

override fun saveData(data: SomeData): Single<SomeData> {
    return database.upsert(data)
}

Repository

 fun getData(): Observable<SomeData> {

    return
    remoteDataStore.getData()
            .doOnError {
                localDataStore.getData() //? this will invoke but nothing happens because I'm not subscribed to it
            }
            .doOnNext {
                saveData(it) //The same as before, nothing will happen
            }
}

ViewModel

override fun fetchData() {
repository.getData()
        .observeOn(androidScheduler)
        .subscribeOn(threadScheduler)
        .subscribe(
                { data: SomeData ->
                    dataList.add(data)
                },
                { throwable: Throwable? ->
                    handleError(throwable)
                },
                {
                    //send data to view
                },
                { disposable: Disposable ->
                    compositeDisposable.add(disposable)
                }
        )
}

Thank you for your time.


Solution

  • You need to use one of onErrorResumeNext methods. I would also suggest to change your stream type from Observable to Single as nature of your data seems like Get data once or throw error. It's just a good API design.

    In your particular case I would implement the repository this way:

    class RepositoryImpl @Inject constructor(private val localRepository: Repository, private val remoteRepository: Repository) : Repository {
        override fun getData(): Single<Data> = remoteRepository.getData()
            .onErrorResumeNext { throwable ->
                if (throwable is IOException) {
                    return localRepository.getData()
                }
                return Single.error(throwable)
            }
    }
    

    You might ask why only catch IOException? I usually handle only this exception to not miss anything critical but only unimportant network errors. If you will catch every exception you might miss, for example, a NullPointerException.