Search code examples
kotlinkotlin-extensionkotlinx.coroutines

How to write rx concatArrayEager equivalent in Kotlin CoRoutine?


I would like to convert my rxJava Code to Kotlin CoRoutine.

Below is the code makes both the api and db call and returns the data to UI whatever comes first. Let us say if DB response happens to be quicker than the api. In that case still, the api response would continue until it receives the data to sync with db though it could have done the UI update earlier.

How Would I do it?

class MoviesRepository @Inject constructor(val apiInterface: ApiInterface,
                                        val MoviesDao: MoviesDao) {

fun getMovies(): Observable<List<Movie>> {
    val observableFromApi = getMoviesFromApi()
    val observableFromDb = getMoviesFromDb()
    return Observable.concatArrayEager(observableFromApi, observableFromDb)
}

fun getMoviesFromApi(): Observable<List<Movie>> {

    return apiInterface.getMovies()
            .doOnNext { it ->
                it.data?.let { it1 -> MoviesDao.insertAllMovies(it1) }
                println("Size of Movies from API %d", it.data?.size)
            }
            .map({ r -> r.data })
}

fun getMoviesFromDb(): Observable<List<Movie>> {
    return MoviesDao.queryMovies()
            .toObservable()
            .doOnNext {
                //Print log it.size :)
            }
}

}


Solution

  • As the first step you should create suspend funs for your ApiInterface and MovieDao calls. If they have some callback-based API, you can follow these official instructions.

    You should now have

    suspend fun ApiInterface.suspendGetMovies(): List<Movie>
    

    and

    suspend fun MoviesDao.suspendQueryMovies(): List<Movie>
    

    Now you can write this code:

    launch(UI) {
        val fromNetwork = async(UI) { apiInterface.suspendGetMovies() }
        val fromDb = async(UI) { MoviesDao.suspendQueryMovies() }
        select<List<Movie>> {
            fromNetwork.onAwait { it }
            fromDb.onAwait { it }
        }.also { movies ->
            // act on the movies
        }
    }
    

    The highlight is the select call which will simultaneously await on both Deferreds and act upon the one that gets completed first.

    If you want to ensure you act upon the result from the network, you'll need some more code, for example:

        val action = { movies: List<Movie> ->
            // act on the returned movie list
        }
        var gotNetworkResult = false
        select<List<Movie>> {
            fromNetwork.onAwait { gotNetworkResult = true; it }
            fromDb.onAwait { it }
        }.also(action)
        if (!gotNetworkResult) {
            action(fromNetwork.await())
        }
    

    This code will act upon the DB results only if they come in before the network results, which it will process in all cases.