We have a module that emits incoming Bluetooth messages on a specific Observable. We then process this messages, and finally subscribe at the end to send messages forward. This processing could change at some time, which for most of the processing means recreating the intermediate Observables, and all observables that depend on it (as they would be processing invalid data now).
We would like to change it so recreating some part of the processing does not necessitate recreating everything that depends on it, mostly so we don't have to remember at all times what depended on what, and also so that operators with internal state (like buffer, scan or debounce) don't lose this internal state.
By using a switchOnNext operator, we would solve this problem. Whenever an intermediate observable is recreated, we just add that to the origin of the switchOnNext and whoever subscribed to the output of the switchOnNext would instantly get the new results.
If the processing after the switchOnNext has to change, it will stop getting results until the previous observable changes. This means that we have the oposite problem now. whenever some part changes, we have to recreate everything that it depends on, recursively. This is slightly better (much easier to keep track on what something depends on than everything that depends on it), but observables still lose the internal state as they have to be recreated.
This behavior seems to be against what the documentation says should happen, but it does not explicitly says one way or another.
This code demonstrates the problem.
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
fun main() {
//Observable of observables
val publishSubject: PublishSubject<Observable<Int>> = PublishSubject.create()
//Observable to subscribe to get the most recent values
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
//Now 1 is subscribed
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
//Now 1 and 2 are subscribed
publishSubject.onNext(obsAux1)
observable.subscribe { println("3: $it") }
//Now 1, 2 and 3 are subscribed
//Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
//Now 1, 2, 3 and 4 are subscribed
//Should not print anything
obsAux1.onNext(2)
//Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
obsAux2.onNext(3)
}
Output of this code:
1: 1
2: 1
1: 3
2: 3
3: 3
Expected output:
1: 1
2: 1
3: 1 <--- This is missing
1: 3
2: 3
3: 3
4: 3 <--- This is missing
The first time obsAux1 emits, all three subscriptions should print, but only the ones before it was added to publishSubject print out.
The second time obsAux1 emits, nothing should print as obsAux2 has been inserted already. This works as expected
The first time obsAux2 emits, all four subscriptions should print. The third subscription prints as expected, which should that the subscription worked fine. But the fourth subscription is not printing anything, as it was added after obsAux2 was inserted to publishSubject.
The solution is just to use a BehaviourSubject instead of a PublishSubject, at least for the observable of observables.
The diference between the two is that on a new subscription, a PublishSubject will only emit further elements, while a BehaviourSubject inmediatly emit the last element and continue normally.
I still dont agree that this is how it should work, but it solves our problem anyway.
Code, just in case somebody needs it (just a change in the first line of main and an extra import):
import io.reactivex.subjects.BehaviorSubject
import io.reactivex.subjects.PublishSubject
fun main() {
//Observable of observables
val publishSubject: BehaviorSubject<Observable<Int>> = BehaviorSubject.create()
//Observable to subscribe to get the most recent values
val observable: Observable<Int> = Observable.switchOnNext(publishSubject)
observable.subscribe { println("1: $it") }
//Now 1 is subscribed
val obsAux1 = PublishSubject.create<Int>()
observable.subscribe { println("2: $it") }
//Now 1 and 2 are subscribed
publishSubject.onNext(obsAux1)
observable.subscribe { println("3: $it") }
//Now 1, 2 and 3 are subscribed
//Should print out from subscriptions 1, 2 and 3, but only 1 and 2 printed
obsAux1.onNext(1)
val obsAux2 = PublishSubject.create<Int>()
publishSubject.onNext(obsAux2)
observable.subscribe { println("4: $it") }
//Now 1, 2, 3 and 4 are subscribed
//Should not print anything
obsAux1.onNext(2)
//Should print out from subscriptions 1, 2, 3 and 4, but only 1, 2 and 3 printed
obsAux2.onNext(3)
}