I'm facing a confusion, giving by example 4 Single:
val s1 : Single<String> = service1.execute().subscribeOn(io())
val s2 : Single<Int> = service2.execute().subscribeOn(io())
val s3 : Single<Int> = service3.execute().subscribeOn(io())
val s4 : Single<String> = service4.execute().subscribeOn(io())
val ....
val s10 : Single<Int> = service10.execute().subscribeOn(io())
data class MyObj ( field1: String, field2: Int, field3: Int, field4: String... .... field10: Int )
and I have a service10.execute(s1 : String s2 : Int s3 : Int s4 : String)
If I do:
s1.flatMap { str ->
s2.flatMap { int1 ->
s3.flatMap { int2 ->
s4.flatMap { str2 ->
...
s10.flatmap { int10
service10.execute(myObj(str, int1, int2, str2..., int10))
}
}
}
}
}
Is the same as doing:
Single.zip(
listOf(
s1,
s2,
s3,
s4
...,
...,
s10
)
) { array ->
val str = array[0] as String
val int1 = array[1] as Int
val int2 = array[2] as Int
val str2 = array[3] as String
...
val str10 = array[9] as Int
}
1) Is the flatMap executing there in parallel o sequentially there? 2) If the nested flatMap are sequential, is there a way to make them parallel like the zip?
No, the nested flatMap
s do not make the Single
s run in parallel, as proved by the following test:
// so we can be sure service1 and service2 are active
val bothSubscribed = CountDownLatch(2)
// so we can simulate a blocking, long running operation on both services
val subscribeThreadsStillRunning = CountDownLatch(1)
val service5 = { str: String, str2: String ->
Observable.just("service5: $str, $str2").singleOrError()
}
val scheduler = Schedulers.io()
val createSingle = { value: String ->
Observable
.create<String> { emitter ->
println("subscribe $value on ${Thread.currentThread().name}")
bothSubscribed.countDown()
subscribeThreadsStillRunning.await(10, SECONDS)
emitter.onNext(value)
}
.singleOrError()
.subscribeOn(scheduler)
}
val s1 = createSingle("outer")
val s4 = createSingle("inner")
s1.flatMap { outer ->
s4.flatMap { inner ->
service5(outer, inner)
}
}.subscribe()
assert(bothSubscribed.await(5, SECONDS))
subscribeThreadsStillRunning.countDown()
The reason can be understood by remembering that code within lambda's is not run until the lambda is executed (seems obvious saying like that, but it took me a bit of thinking to get it). s4.flatMap
is what triggers the subscribe to s4
, but this code doesn't execute until outer
is available, i.e. until s1
has already emitted and is therefore complete.
Zip seems like the perfect solution for this, and I'm not sure why you want to use flat map. I can't think of a way to do it. It also has a type safe API so you don't have to use the array based API in your example.
Singles
.zip(s1, s4) { outer, inner -> service5(outer, inner) }
.flatMap { it }
.subscribe()
Note that I have used Singles
from "io.reactivex.rxjava3:rxkotlin:3.0.0-RC1"
as the lambdas work better with Kotlin.