Search code examples
kotlinrx-kotlin2

switchOnNext operator does not emit for subscriptions after last inserted observable


First, some background (maybe there is a better way of doing this):

We have a module that emits incoming Bluetooth messages on a specific Observable. We then process this messages, and finally subscribe at the end to send messages forward. This processing could change at some time, which for most of the processing means recreating the intermediate Observables, and all observables that depend on it (as they would be processing invalid data now).

We would like to change it so recreating some part of the processing does not necessitate recreating everything that depends on it, mostly so we don't have to remember at all times what depended on what, and also so that operators with internal state (like buffer, scan or debounce) don't lose this internal state.

The promising solution:

By using a switchOnNext operator, we would solve this problem. Whenever an intermediate observable is recreated, we just add that to the origin of the switchOnNext and whoever subscribed to the output of the switchOnNext would instantly get the new results.

The problem:

If the processing after the switchOnNext has to change, it will stop getting results until the previous observable changes. This means that we have the oposite problem now. whenever some part changes, we have to recreate everything that it depends on, recursively. This is slightly better (much easier to keep track on what something depends on than everything that depends on it), but observables still lose the internal state as they have to be recreated.

This behavior seems to be against what the documentation says should happen, but it does not explicitly says one way or another.

Example code:

This code demonstrates the problem.

import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject

fun main() {
    //Observable of observables
    val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
    //Observable to subscribe to get the most recent values
    val observable: Observable<Int> = Observable.switchOnNext(publishSubject)

    observable.subscribe { println("1: $it") }
    //Now 1 is subscribed

    val obsAux1 = PublishSubject.create<Int>()

    observable.subscribe { println("2: $it") }
    //Now 1 and 2 are subscribed

    publishSubject.onNext(obsAux1)

    observable.subscribe { println("3: $it") }
    //Now 1, 2 and 3 are subscribed

    //Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
    obsAux1.onNext(1)

    val obsAux2 = PublishSubject.create<Int>()

    publishSubject.onNext(obsAux2)

    observable.subscribe { println("4: $it") }
    //Now 1, 2, 3 and 4 are subscribed

    //Should not print anything
    obsAux1.onNext(2)
    //Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
    obsAux2.onNext(3)
} 

Output of this code:

1: 1
2: 1
1: 3
2: 3
3: 3

Expected output:

1: 1
2: 1
3: 1 <--- This is missing
1: 3
2: 3
3: 3
4: 3 <--- This is missing

The first time obsAux1 emits, all three subscriptions should print, but only the ones before it was added to publishSubject print out.

The second time obsAux1 emits, nothing should print as obsAux2 has been inserted already. This works as expected

The first time obsAux2 emits, all four subscriptions should print. The third subscription prints as expected, which should that the subscription worked fine. But the fourth subscription is not printing anything, as it was added after obsAux2 was inserted to publishSubject.


Solution

  • The solution is just to use a BehaviourSubject instead of a PublishSubject, at least for the observable of observables.

    The diference between the two is that on a new subscription, a PublishSubject will only emit further elements, while a BehaviourSubject inmediatly emit the last element and continue normally.

    I still dont agree that this is how it should work, but it solves our problem anyway.

    Code, just in case somebody needs it (just a change in the first line of main and an extra import):

    import io.reactivex.subjects.BehaviorSubject
    import io.reactivex.subjects.PublishSubject
    
    fun main() {
        //Observable of observables
        val publishSubject: BehaviorSubject<Observable<Int>> = BehaviorSubject.create()
        //Observable to subscribe to get the most recent values
        val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
    
        observable.subscribe { println("1: $it") }
        //Now 1 is subscribed
    
        val obsAux1 = PublishSubject.create<Int>()
    
        observable.subscribe { println("2: $it") }
        //Now 1 and 2 are subscribed
    
        publishSubject.onNext(obsAux1)
    
        observable.subscribe { println("3: $it") }
        //Now 1, 2 and 3 are subscribed
    
        //Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
        obsAux1.onNext(1)
    
        val obsAux2 = PublishSubject.create<Int>()
    
        publishSubject.onNext(obsAux2)
    
        observable.subscribe { println("4: $it") }
        //Now 1, 2, 3 and 4 are subscribed
    
        //Should not print anything
        obsAux1.onNext(2)
        //Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
        obsAux2.onNext(3)
    }