multicasting occurs when all my subscribers recieve the same emission before moving onto the next emission. But when i use the share command i am not seeing multicasting. i have an expensive operation i want only done once. lets take a look at this code:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}
fun doMultiplyBy2(){
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
ob1.share()
}
}
here is the actual output:
expensive operation
1st subscriber: 2;
expensive operation
1st subscriber: 4;
expensive operation
1st subscriber: 6;
expensive operation
1st subscriber: 8;
expensive operation
1st subscriber: 10;
expensive operation
2nd subscriber: 2;
expensive operation
2nd subscriber: 4;
expensive operation
2nd subscriber: 6;
expensive operation
2nd subscriber: 8;
expensive operation
2nd subscriber: 10;
but why is it repeating the expensive operation before emitting to all subscribers. its repeating the expensive operation for each subscriber? i am using share so i am expecting the output to be like this:
expensive operation
1st subscriber: 2;
2nd subscriber: 2;
expensive operation
1st subscriber: 4;
2nd subscriber: 4;
expensive operation
1st subscriber: 6;
2nd subscriber: 6;
expensive operation
1st subscriber: 8;
2nd subscriber: 8;
expensive operation
1st subscriber: 10;
2nd subscriber: 10;
what else is interesting is that i found that the expected output only occurs if i do the following:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.publish()
fun doMultiplyBy2(){
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
ob1.connect()
}
}
thus making it a connectable observable and then connecting manually. Why does share not work ?
UPDATE: I want to make it very clear what the issue is:
share should be the same as publish().refCount() and i also thought that share would multicast for me but im not seeing it doing that. lets take a look at not using share but rather using publish and connect manually:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.publish()
fun doMultiplyBy2(){
//ob1 = ob1.share()
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
ob1.connect()
}
}
the output of this is:
expensive operation
1st subscriber: 2;
2nd subscriber: 2;
expensive operation
1st subscriber: 4;
2nd subscriber: 4;
expensive operation
1st subscriber: 6;
2nd subscriber: 6;
expensive operation
1st subscriber: 8;
2nd subscriber: 8;
expensive operation
1st subscriber: 10;
2nd subscriber: 10;
which is exactly what i want. expensive operation done once per emission.
not lets change it to use share:
var ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.publish().refCount()//or can use share()
fun doMultiplyBy2(){
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
}
}
which yields the following output:
expensive operation
1st subscriber: 2;
expensive operation
1st subscriber: 4;
expensive operation
1st subscriber: 6;
expensive operation
1st subscriber: 8;
expensive operation
1st subscriber: 10;
expensive operation
2nd subscriber: 2;
expensive operation
2nd subscriber: 4;
expensive operation
2nd subscriber: 6;
expensive operation
2nd subscriber: 8;
expensive operation
2nd subscriber: 10;
what then is the purpose of publish().refCount() if its not multicasting its just like an ordinary observable. whats the point of it or share ??
As you know share
operator is the same publish().refCount()
. Refcount
makes connectable observer
as you know. So your code is correct. But you have a missing thing that is Thread
. I think you can understand what I want to explain about it. If not let me know!
Change the code like this
val ob1 = Observable.fromArray(1,2,3,4,5).map {
println("expensive operation")
it * 2
}.subscribeOn(Schedulers.computation()).share()
// Add subscribeOn operator to change emitting thread from MAIN to WORK
fun doMultiplyBy2() {
ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
}
doMultiplyBy2()
Thread.sleep(1000) // Waiting for ending to execute
output
expensive operation
1st subscriber: 2;
2nd subscriber: 2;
expensive operation
1st subscriber: 4;
2nd subscriber: 4;
expensive operation
1st subscriber: 6;
2nd subscriber: 6;
expensive operation
1st subscriber: 8;
2nd subscriber: 8;
expensive operation
1st subscriber: 10;
2nd subscriber: 10;