Search code examples
androidkotlinrx-javaretrofit2reactivex

How to execute more than 3 parallel http requests using RxJava (+ retrofit2) on Android


I'm trying to learn how to work with RxJava (io.reactivex.rxjava2:rxandroid:2.0.1) + retrofit2 + kotlin.

How Observable is involved in this, in general, figured out, even learned how to combine the results of 2-3 parallel queries based on the performance of all with the help of Observable.zip, BUT how to perform similarly more than 3 parallel queries I can not understand.

The source code for the Observable class has a method

static <T1, T2, T3, T4, R> Observable <R> zip (),

but it does not work like

static <T1, T2, T3, R> Observable <R> zip ()

in the case of three Observable inputs and Function3 <*, *, *, *>.

I tried already to make the first three queries using zip, then flatMap and so on, but it still does not work. Reading documentation and examples does not help to turn to the right direction. I looked towards Observable.combineLatest, but came to the conclusion (it may be erroneous) that the method would return the result of the first Observable that was executed.

What I want:

I have 4 methods that return Observable:

  • this.orderRepository.getStatuses (): Observable
  • this.orderRepository.getOrders (): Observable
  • this.userRepository.getUsers (): Observable
  • this.orderRepository.getTypes (): Observable

Prompt, please, how should I compose these methods so that they run in parallel, and at the end get the result of all 4 methods in one place?

It's already messed up, but I generally do not understand what I should do with the 4 Observables, so that they all in one place and return to the UI. + already confused with flatMap, map and so on ...

Please excuse me for mistakes


Solution

  • What you need is zip, probably the problem that you had is because zip will only emit when all observables have emitted.

    If you need to receive the updates as soon as you get them, I think that your best bet is to map the answers to a common type, for example:

    class Response(val statuses: Statuses? = null, val orders: Orders? = null, val users: Users? = null, val types: Types? = null)
    
    this.orderRepository.getStatuses().map(Response(statuses = it))
    

    After you do this to all observables, you will be able to use concat or merge who will emit as soon as any observable emit.

    More info:

    http://www.introtorx.com/uat/content/v1.0.10621.0/12_CombiningSequences.html

    http://reactivex.io/documentation/operators/concat.html

    https://code.i-harness.com/en/q/1bddcd2