Search code examples
androidrx-javarx-java2rx-android

RxJava - Combining the results of multiple Singles and return as one


I've got the following problem: I have a Single that returns a list of objects, inside each object, there is a field that is also a Single, and that returns a list of items (which is what i need), but i also need this list to be returned as one result, rather than as each individual single..

I have an Observable that i can use which is wrapping all of this code, so i would like to emit the result in onNext.

Additionally, I also need to add the individual results of each Single into a cache, with the key being the parent object

Code:

object.getListOfObjects().doOnSuccess { objectList ->
                      objectList.map { singleItemInList ->
                          singleItemInList.listOfValues.subscribe({ valueList ->
                              cache[singleItemInList] = valueList
                              emitter.onNext(valueList)
                            // ^ this is the bit where i need to get all values instead of 
                            //   emitting on next for each item
                          },{
                              emitter.onError(it)
                          })
                      }
                  }.subscribe( {
                   /* no-op*/

                  }, {
                      emitter.onError(it)
                  })

Any help would be appreciated!!

Thanks in advance :)

EDIT - Extra details:

fun getListOfObjects(): Single<List<Object>> =
        Single.zip(
            allObjects().map { it.objects() }
        ) { t: Array<Any> ->
            t.map { it as Object }
        }

Object {
    [...]
    val listOfValues: Single<ValueList>
    [...]
}

Solution

  • I've found an approach, but it is far from pretty... Any suggestions as to how this could be implemented more prettily would be greatly appreciated! :D

    object.getListOfObjects().doOnSuccess { objectList ->
                        val list = objectList.map { singleItemInList ->
                                  singleItemInList.listOfValues.subscribe({ valueList ->
                                      cache[singleItemInList] = valueList
                                  },{
                                      emitter.onError(it)
                                  })
                              }
                          }.flatten().map {
                            it.toObservable()
                          }
                        Observable.zip(list) { itemList ->
                          val compoundList = mutableListOf<Value>()
                          for (i in itemList.indices) {
                            val singleList = itemList[i] as? List<Value>
                            singleList?.map { item ->
                                compoundList.add(item)
                            }
                        }
                        emitter.onNext(compoundList) 
                        // ^ this now emits the entire list instead of 
                        // each sublist individually
                        emitter.onComplete()
                    }.subscribe({
                       /* no-op */
                    }, {
                       emitter.onError(it)
                    })
                }.subscribe({
                    /* no-op */
                }, {
                   emitter.onError(it)
                })