Search code examples
androidrx-javareactive-programmingrx-kotlin

Fetching a Single<List> for all elements of another Single<List> in RxKotlin


A, B, C are objects

All function calls are made to a Rooms DB

This code snippet is inside a ViewModel

repo = Repository

So I'm making an android app (can't provide details) and for a particular screen I need to do the following.

My first call is repo.getInfo, which returns a Single Observable ListOfA: Single<List<A>> //perform some operations

for every element of ListOfA I need to call another function repo.getB(A) which returns a Single Observable ListOfB: Single<List<B>> //perform some operations

for every element of ListOfB I need to call another function repo.getC(B) which returns a Single Observable ListOfC: Single<List<C>> //perform some operations

after I have the required data I need to call another function that combines the data to display on the UI.

Now I can't get this to work. Here's what I've tried. But the flow stops at the line marked THIS LINE and jumps to subscribe block. Individual calls to the functions work so the data is not the problem. I'm pretty new at this and quite frankly out of my depth. Any help or hint is appreciated. Thanks

localListOfA = emptyList<A>()
localListOfB = emptyList<B>()
localListOfC = emptyList<C>()
compositeDisposable.add(
    getInfo.map{listOfA ->
        localListOfA.addAll(listofA)
        listOfA.map {elementA ->   ////THIS LINE
            getB(elementA.id).map{listOfB ->
                listOfB.filter {
                    //some logic to select a few objects
                }
            }.map { it // filtered list of B
                localListofB.addAll(it)
                localListOfB.last() //I only need the top element of this list
            }.map{elementB ->
                getC(elementB.id).map{ listOfC ->
                    localListOfC.addAll(listOfC)
                    //do some operations
                }
            }
        }
    }
    .subscribeOn(DEFAULT_CACHED_SCHEDULERS)
    .observeOn(AndroidSchedulers.mainThread())
    .doOnError(/*take log*/)
    .subscribe{
        prepareUi()
    }
)

Solution

  • You can flatten a List into an Observable using .flattenAsObservable

    getInfo // Single<List<A>>
        .doOnSuccess { localListOfA.addAll(it) } // Side effect, adding to localListOfA
        .flattenAsObservable { it } // Observable<A>
        .flatMapSingle { elementA -> getB(elementA.id) } // Observable<List<B>>
        .map { it.filter { true } } // Some logic to select a few objects from B
        .doOnNext { localListOfB.addAll(it) } // Side effect, adding to localListOfB
        .map { it.last() } // Observable<B>, only the last element
        .flatMapSingle { elementB -> getC(elementB.id) } // Observable<List<C>>
        .doOnNext { localListOfC.addAll(it) } // Side effect, adding to localListOfC
        .flatMapIterable { it } // Observable<C>
    

    Now, you mentioned you need to combine this data somehow. In Rx you can nest chains in order to access the intermediate data. For example, if you have a call that returns a Single<Foo> and you need Foo for the function getBar(foo: Foo): Single<Bar>, one way of achieving this is as follows:

    getFoo().flatMap { foo -> // .concatMap, .switchMap
        getBar(foo).map { bar ->
            // Use both foo and bar
        }
    }