Search code examples
rx-javarx-kotlin

Using data from another observable


I'm failing to get into the ReactiveX mindset or the codebase I am working in is just poorly written.

Suppose I have some Observable A() and I need data from another Observable B() in order to do validation on data coming through A, how do I accomplish this in RxJava (would prefer RxKotlin implementation). Note that both A and B return a Single of a List of objects.

fun B(): Single<List<Bar>> {
  ...
}

fun A() : Single<List<Foo>> {
  Single.just(readRecords()).map { record ->
    // val bar = B.getAll()??? This seems like an anti-pattern and I'm not sure if it would necessarily be right to .subscribe()???

    if (bar.contains(record)) {
      // ... some validation
    }
  }
}

Update 1: Should emphasize that the validation requires multiple sources so you could have B, C, D, etc.


Solution

  • So you can use .map {} operator:

    fun A() : Single<List<Foo>> {
        return B.getAll().map { allFromB ->  
            val record = readRecords()
            if (allFromB.contains(record)) {
               // ... some validation
            }
            ...
        }
    }
    

    UPDATE if you have few of such Observables you need to use Observables.combineLatest() or Observable.combineLatest() (depends on RX version you use):

    fun B(): Single<List<BarB>> {
       ...
    }
    
    fun C(): Single<List<BarC>> {
       ...
    }
    
    fun D(): Single<List<BarD>> {
       ...
    }
    
    fun A() : Single<List<Foo>> {
        return Observable.combineLatest(B.getAll(), C.getAll(), D.getAll) { barB, barC, barD -> 
                val record = readRecords()
                //Do your staff here with all this vals and return List<Foo>
            }
    }