Search code examples
androidrx-java2android-roomcollectrx-kotlin

Building a list from two observable sources in RxKotlin/RxJava using collectInto


I have a Category data class and a Plan data class. Each Category has a list of plan ids. There are Categories and Plans stored via Room. I am trying to construct a local List<Any> where I add each category to a list, and then add each of it's plans.

So for each category, add the category to the list, then add each plan that belongs to that category.

The final result would look something like this...

0 -> a Category
1 -> a Plan
2 -> a Plan
3 -> a Plan
4 -> a Category
5 -> a Plan

etc.

The following calls sucessfully return an Observable<List<Category>> and an Observable<Plan>

AppDatabase
   .getDatabase(context)
   .categoryDao()
   .getAll()

AppDatabase.getDatabase(context).planDao().getPlan(planId)

Here I am trying to build my list, but it actually never emits when I subscribe to it. No completion, or error. Everything else in the stream get's hit. Why can't I get my final result?

    fun fetchCategoriesAndPlans() {
    val items = mutableListOf<Any>()
    AppDatabase
        .getDatabase(context)
        .categoryDao()
        .getAll()
        .concatMap { listOfCategories ->
            listOfCategories.toObservable()
        }
        .doOnNext { category ->
            items.add(category)
        }
        .concatMap { category ->
            category.getPlanIds()!!.toObservable()
        }
        .flatMap { planId ->
            AppDatabase.getDatabase(context).planDao().getPlan(planId)
        }.collectInto(items, BiConsumer{ list, i ->
            Log.d(TAG, "Collect into")
            list.add(i)
        })
        .subscribeBy(
            onSuccess = {
                Log.d(TAG, "Got the list")
            },
            onError = {
                Log.e(TAG, "Couldn't build list ${it.message}", it)
            })
}

Solution

  • I make a demo base on your case which help emit both Category and Plan

    override fun onCreate(savedInstanceState: Bundle?) {
        ...
    
        getCategories()
            .flattenAsObservable { it }
            .flatMap { getPlanWithCategory(it) }
            .toList()
            .subscribe({
                for (item in it) {
                    Log.i("TAG", " " + item.javaClass.canonicalName)
                }
            }, {
    
            })
    }
    
    fun getPlanWithCategory(category: Category): Observable<Any> {
        val getPlansObservable = Observable.fromArray(category.planIds).flatMapIterable {
            it
        }.flatMap {
            getPlan(it).toObservable()
        }
        return Observable.concat(Observable.just(category), getPlansObservable)
    }
    
    
    fun getPlan(planId: String): Single<Plan> {
        return Single.just(Plan())
    }
    
    fun getCategories(): Single<List<Category>> {
        val categories = arrayListOf<Category>()
        categories.add(Category(arrayListOf("1", "2", "3")))
        categories.add(Category(arrayListOf("1", "2")))
        return Single.just(categories)
    }
    
    class Category(val planIds: List<String>)
    
    class Plan
    

    Out put

     I/TAG:  Category
     I/TAG:  Plan
     I/TAG:  Plan
     I/TAG:  Category
     I/TAG:  Plan
     I/TAG:  Plan
    

    Hope it help