Search code examples
kotlinrx-javakotlin-coroutines

Mix and match Coroutines and Rxjava


Coroutines and RxJava3

I have the following method that first makes a call to a suspend method and in the same launch scope I make 2 calls to RxJava.

I am wondering if there is a way to remove the Rxjava code out of the viewModelScope.launch scope and return the result of fetchRecentUseCase.execute().

Basically, is it possible for the viewModelScope.launch to return the listOfProducts rather than doing everything in the launch scope?

fun loadRecentlyViewed() {
    viewModelScope.launch {
        val listOfProducts = withContext(Dispatchers.IO) {
            fetchRecentUseCase.execute()
        }

        val listOfSkus = listOfProducts.map { it.sku }

        if (listOfSkus.isNotEmpty()) {
            loadProductUseCase.execute(listOfSkus)
                .subscribeOn(schedulersFacade.io)
                .flatMap(convertProductDisplayUseCase::execute)
                .map { /* work being done */ }
                .observeOn(schedulersFacade.ui)
                .subscribeBy(
                    onError = Timber::e,
                    onSuccess = { }
                )
        }
    }
}

Usecase for the suspend method

class FetchRecentUseCaseImp() {
    override suspend fun execute(): List<Products> {
        // Call to network 
    }
}

Many thanks in advance


Solution

  • With coroutines, the way to return a single item that is produced asynchronously is to use a suspend function. So instead of launching a coroutine, you mark the function as suspend and convert blocking or async callback functions into non-blocking code.

    The places where coroutines are launched are typically at UI interactions (click listeners), or when classes are first created (on Android, this is places like in a ViewModel constructor or Fragment's onViewCreated()).

    As a side note, it is against convention for any suspend function to expect the caller to have to specify a dispatcher. It should internally delegate if it needs to, for example:

    class FetchRecentUseCaseImp() {
        override suspend fun execute(): List<Products> = withContext(Dispatchers.IO) {
            // Synchronous call to network 
        }
    }
    

    But if you were using a library like Retrofit, you'd simply make your Request and await() it without specifying a dispatcher, because await() is a suspend function itself.

    So your function should look something like:

    suspend fun loadRecentlyViewed(): List<SomeProductType> {
      val listOfSkus = fetchRecentUseCase.execute().map(Product::sku)
      if (listOfSkus.isEmpty()) {
        return emptyList()
      }
      return runCatching {
        loadProductUseCase.execute(listOfSkus) // A Single, I'm assuming
          .await() // Only if you're not completely stripping Rx from project
          .map { convertProductDisplayUseCase.execute(it).await() } // Ditto for await()
          .toList()
          .flatten()
      }.onFailure(Timber::e)
        .getOrDefault(emptyList())
    }