Search code examples
rx-javarx-java2

Why is rxjava2 share operator not multicasting?


multicasting occurs when all my subscribers recieve the same emission before moving onto the next emission. But when i use the share command i am not seeing multicasting. i have an expensive operation i want only done once. lets take a look at this code:

var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }

    fun doMultiplyBy2(){
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}

        ob1.share()
    }
}

here is the actual output:

    expensive operation
 1st subscriber: 2;
expensive operation
 1st subscriber: 4;
expensive operation
 1st subscriber: 6;
expensive operation
 1st subscriber: 8;
expensive operation
 1st subscriber: 10;
expensive operation
 2nd subscriber: 2;
expensive operation
 2nd subscriber: 4;
expensive operation
 2nd subscriber: 6;
expensive operation
 2nd subscriber: 8;
expensive operation
 2nd subscriber: 10;

but why is it repeating the expensive operation before emitting to all subscribers. its repeating the expensive operation for each subscriber? i am using share so i am expecting the output to be like this:

    expensive operation
 1st subscriber: 2;
 2nd subscriber: 2;
expensive operation
 1st subscriber: 4;
 2nd subscriber: 4;
expensive operation
 1st subscriber: 6;
 2nd subscriber: 6;
expensive operation
 1st subscriber: 8;
 2nd subscriber: 8;
expensive operation
 1st subscriber: 10;
 2nd subscriber: 10;

what else is interesting is that i found that the expected output only occurs if i do the following:

var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }.publish()

    fun doMultiplyBy2(){
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}

        ob1.connect()
    }
}

thus making it a connectable observable and then connecting manually. Why does share not work ?

UPDATE: I want to make it very clear what the issue is:

share should be the same as publish().refCount() and i also thought that share would multicast for me but im not seeing it doing that. lets take a look at not using share but rather using publish and connect manually:

 var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }.publish()

    fun doMultiplyBy2(){
        //ob1 = ob1.share()
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}

        ob1.connect()
    }
}

the output of this is:

expensive operation
 1st subscriber: 2;
 2nd subscriber: 2;
expensive operation
 1st subscriber: 4;
 2nd subscriber: 4;
expensive operation
 1st subscriber: 6;
 2nd subscriber: 6;
expensive operation
 1st subscriber: 8;
 2nd subscriber: 8;
expensive operation
 1st subscriber: 10;
 2nd subscriber: 10;

which is exactly what i want. expensive operation done once per emission.

not lets change it to use share:

var  ob1 = Observable.fromArray(1,2,3,4,5).map {
       println("expensive operation")
       it * 2
   }.publish().refCount()//or can use share()

    fun doMultiplyBy2(){
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}

        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
    }
}

which yields the following output:

    expensive operation
 1st subscriber: 2;
expensive operation
 1st subscriber: 4;
expensive operation
 1st subscriber: 6;
expensive operation
 1st subscriber: 8;
expensive operation
 1st subscriber: 10;
expensive operation
 2nd subscriber: 2;
expensive operation
 2nd subscriber: 4;
expensive operation
 2nd subscriber: 6;
expensive operation
 2nd subscriber: 8;
expensive operation
 2nd subscriber: 10;

what then is the purpose of publish().refCount() if its not multicasting its just like an ordinary observable. whats the point of it or share ??


Solution

  • As you know share operator is the same publish().refCount(). Refcount makes connectable observer as you know. So your code is correct. But you have a missing thing that is Thread. I think you can understand what I want to explain about it. If not let me know!

    Change the code like this

    val ob1 = Observable.fromArray(1,2,3,4,5).map {
        println("expensive operation")
        it * 2
    }.subscribeOn(Schedulers.computation()).share() 
    // Add subscribeOn operator to change emitting thread from MAIN to WORK
    
    fun doMultiplyBy2() {
        ob1.flatMap { Observable.just(" 1st subscriber: $it;") }.subscribe{println(it)}
    
        ob1.flatMap { Observable.just(" 2nd subscriber: $it;") }.subscribe{println(it)}
    }
    
    doMultiplyBy2()
    
    Thread.sleep(1000) // Waiting for ending to execute
    

    output

    expensive operation
     1st subscriber: 2;
     2nd subscriber: 2;
    expensive operation
     1st subscriber: 4;
     2nd subscriber: 4;
    expensive operation
     1st subscriber: 6;
     2nd subscriber: 6;
    expensive operation
     1st subscriber: 8;
     2nd subscriber: 8;
    expensive operation
     1st subscriber: 10;
     2nd subscriber: 10;