Search code examples
javaandroidkotlinrx-javarx-java2

Combine 2 data sources in RxJava with possibility to fetch new data from 1 of the observables


I'm currently working on an Android app where I'm using RxJava in a ViewModel to handle data from both a local Room database and a remote API (Retrofit). I have implemented a pull-to-refresh feature, and in the onRefresh method, I'm currently creating a new observable to fetch data from the remote API.

The problem is I want the combined observable to be triggered when user does pull-to-refresh, but it's not triggered.

What may be the issue?

@HiltViewModel
class MyViewModel @Inject constructor(
    private val remoteDataSource: RemoteDataSource,
    private val localDataSource: LocalDataSource,
) {

    private val remoteDataObservable = BehaviorSubject.createDefault<List<Data>>(emptyList())

    init {
        disposables += Observable.combineLatest(
            localDataSource.getData(),
            remoteDataObservable,
            { local, remote ->
                local.plus(remote)
            }
        )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                displayList(it)
            }
        fetchLatestData()
    }

    fun onPullToRefresh() {
        fetchLatestData()
    }
    
    private fun fetchLatestData() {
        remoteDataSource.fetchLatestData()
            .subscribeOn(Schedulers.io())
            .subscribe(remoteDataObservable)
    }

    private fun displayList(list: List<Data>) {
        // Display list
    }

}

Solution

  • I've managed to fix my problem by changing a way to subscribe to the Observable, which fetches data from the API. Now I created a new subscriber, end emit data to the BehaviorSubject manually. I'm not satisfied with the solution, but nothing better came to my mind:

    private fun fetchLatestData() {
        disposables += remoteDataSource.fetchLatestData()
            .subscribeOn(Schedulers.io())
            .subscribe({
                remoteDataObservable.onNext(it)
            }, {})
    }