Search code examples
androidkotlinrealmrx-java2

Rxjava 2 - fetch data with realm and retrofit


Task: I want to fetch local data first. If returned data is null I want to call API service.

Presenter:

private fun getData() {
    val disposable = dataRepository.getDataFromRepository(String: itemId)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({ _ ->
                // I got my data
            }, {
                // error
            })
    compositeDisposable.add(disposable)
}

DataRepository:

fun getDataFromRepository(itemId: String): Single<Data> {
    val dataAsSingle: Single<Event>

    //fetch data locally
    dataAsSingle = getDataLocally(itemId)

    //if nothing was found locally, call api service to fetch data
    if (dataAsSingle == null) {
        getDataFromApi(itemId)
    }

    return dataAsSingle
}

private fun getDataLocally(itemId: String): Single<Data> {
    var data: Data? = null

    val realm = Realm.getDefaultInstance()
        data = realm.where(Data::class.java).equalTo(itemId)
    // some other logic..
    realm.close()

    return data?.let {
        Single.just(data)
    } ?: Single.error(NoSuchEventFoundException())
}

private fun getDataFromApi(itemId: String): Single<Data> {
    return dataService.getDataFromApiCall(itemId)
            .onErrorResumeNext(ErrorHandler(BaseErrorParser()))
            .map { apiResponse ->
                    //some logic blah blah blah...
                    return@map data
                } 
            }
}

Well, I think my approach is wrong, but I can't initally tell you why... This works okay, but it seems that it skips main points of RxJava. Maybe someone can explain how to solve this problem with correct pattern and approach using RxJava 2?

Updated

DataRepository:

fun getDataFromRepository(itemId: String): Single<Data> {

    val localData: Observable<Data> = getDataLocally(itemId)
    val remoteData: Observable<Data> = getDataFromApi(itemId)
           .doOnNext(

            })

    return Observable
            .concat(localEvent, remoteEvent)
            .first()
}

private fun getDataLocally(itemId: String): Observable<Data> {
    var data: Data? = null

    val realm = Realm.getDefaultInstance()
        data = realm.where(Data::class.java).equalTo(itemId)
    // some other logic..
    realm.close()

    return Observable.just(data)
}

private fun getDataFromApi(itemId: String): Observable<Data> {
    return dataService.getDataFromApiCall(itemId)
            .map { apiResponse ->
                    //some logic blah blah blah...
                    return@map data
                } 
            }
}

Solution

  • Actually, the proper way to do it with Realm would be to listen for changes in the Realm, and update the Realm from a background thread from the API if the Realm doesn't contain what you're looking for

    (with limitation of using this method from UI thread, in order to receive RealmResults to UI thread - this way you can use async transaction and listen for changes in Realm):

    private fun getData() {
        val disposable = dataRepository.getDataFromRepository(String: itemId)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ _ ->
                    // I got my data
                }, {
                    // error
                })
        compositeDisposable.add(disposable)
    }
    

    Where:

    fun getDataFromRepository(itemId: String): Observable<Data> =
        Observable.create { emitter ->
            val realm = Realm.getDefaultInstance()
            val results = realm.where<Data>().equalTo("itemId", itemId).findAllAsync()
            val listener = RealmChangeListener { results ->
                if(!emitter.isDisposed()) {
                    emitter.onNext(results)
                }
            }
            emitter.setDisposable(Disposable {
                results.removeChangeListener(listener)
                realm.close()
            })
            results.addChangeListener(listener)
        }.subscribeOn(AndroidSchedulers.mainThread())
        .doOnNext { results ->
            if(results.isEmpty()) {
                dataService.getDataFromApiCall(itemId)
                           // .toSingle() // ensure this completes automatically, but only if it's not a single already
                           .subscribeOn(Schedulers.io())
                           .subscribeWith(object: DisposableObserver<>() {
                               fun onNext(data: Data) {
                                   Realm.getDefaultInstance().use { realm ->
                                      realm.executeTransaction { realm ->
                                          realm.insert(data)
                                       }
                                   }
                               }
    
                               fun onError(throwable: Throwable) {
                                   ... /* do something */ 
                               }
                          )
            }
        }
    }