Search code examples
javakotlinfunctional-programmingrx-javarx-java2

Nested Single.flatMap in RxJava vs Single.zip, are the same?


I'm facing a confusion, giving by example 4 Single:

val s1 : Single<String> = service1.execute().subscribeOn(io())
val s2 : Single<Int> = service2.execute().subscribeOn(io())
val s3 : Single<Int> = service3.execute().subscribeOn(io())
val s4 : Single<String> = service4.execute().subscribeOn(io())
val ....
val s10 : Single<Int> = service10.execute().subscribeOn(io())

data class MyObj ( field1: String, field2: Int, field3: Int, field4: String... .... field10: Int )

and I have a service10.execute(s1 : String s2 : Int s3 : Int s4 : String)

If I do:

s1.flatMap { str -> 
    s2.flatMap { int1 ->
        s3.flatMap { int2 ->
            s4.flatMap { str2 ->
                ...
                s10.flatmap { int10
                  service10.execute(myObj(str, int1, int2, str2..., int10))
                }
            }
        }
    }
}

Is the same as doing:

Single.zip(
            listOf(
                s1,
                s2,
                s3,
                s4
              ...,
              ...,
              s10
            )
        ) { array ->
            val str = array[0] as String
            val int1 = array[1] as Int
            val int2 = array[2] as Int
            val str2 = array[3] as String
            ...
            val str10 = array[9] as Int
        }

1) Is the flatMap executing there in parallel o sequentially there? 2) If the nested flatMap are sequential, is there a way to make them parallel like the zip?


Solution

  • No, the nested flatMaps do not make the Singles run in parallel, as proved by the following test:

        // so we can be sure service1 and service2 are active
        val bothSubscribed = CountDownLatch(2)
        // so we can simulate a blocking, long running operation on both services
        val subscribeThreadsStillRunning = CountDownLatch(1)
    
        val service5 = { str: String, str2: String ->
            Observable.just("service5: $str, $str2").singleOrError()
        }
    
        val scheduler = Schedulers.io()
    
        val createSingle = { value: String ->
            Observable
                .create<String> { emitter ->
                    println("subscribe $value on ${Thread.currentThread().name}")
                    bothSubscribed.countDown()
                    subscribeThreadsStillRunning.await(10, SECONDS)
                    emitter.onNext(value)
                }
                .singleOrError()
                .subscribeOn(scheduler)
        }
    
        val s1 = createSingle("outer")
        val s4 = createSingle("inner")
    
        s1.flatMap { outer ->
            s4.flatMap { inner ->
                service5(outer, inner)
            }
        }.subscribe()
    
        assert(bothSubscribed.await(5, SECONDS))
        subscribeThreadsStillRunning.countDown()
    

    The reason can be understood by remembering that code within lambda's is not run until the lambda is executed (seems obvious saying like that, but it took me a bit of thinking to get it). s4.flatMap is what triggers the subscribe to s4, but this code doesn't execute until outer is available, i.e. until s1 has already emitted and is therefore complete.

    Zip seems like the perfect solution for this, and I'm not sure why you want to use flat map. I can't think of a way to do it. It also has a type safe API so you don't have to use the array based API in your example.

    Singles
            .zip(s1, s4) { outer, inner -> service5(outer, inner) }
            .flatMap { it }
            .subscribe()
    

    Note that I have used Singles from "io.reactivex.rxjava3:rxkotlin:3.0.0-RC1" as the lambdas work better with Kotlin.