In my system I have a source, two "steps" that map the source to a new value, and then a sum that combines those two steps to create a final value. The initial run through of this system works as I hoped, generating a single sum of 3.
var source = new Rx.BehaviorSubject(0);
var stepOne = source.map(function (value) {
return value + 1;
});
var stepTwo = source.map(function (value) {
return value + 2;
});
var sum = Rx.Observable.combineLatest(
stepOne,
stepTwo,
function (s1, s2) {
console.log('calc sum: ' + (s1 + s2));
return s1 + s2;
}).subscribe(function (sum) {
});
Outputs:
> calc sum: 3
But if I then put in a new value for source I get two results like this:
source.onNext(1);
> calc sum: 4
> calc sum: 5
The first is an intermediate result… as the new source value passes through one part of the system, and then I get the final result when all values have finished propagating.
So my questions is, what's the recommended way to configure things so that a new value pushed into source will pass through the system atomically and only generate one sum result?
Thanks!
That's how combineLatest
works, it indeed is confusing since it allows these temporarily inconsistent states as you pointed out. The key thing to learn from combineLatest
is that it emits a new item whenever any one of its sources emits a new item, and it does so partially, it doesn't have any sort of "waiting" mechanism.
In diagrams, http://rxmarbles.com/#combineLatest.
What you probably want is the zip
operator. Zip waits for its inputs to emit items that match with each other. In other words, zip's output emits its n-th item once all the n-th items from all inputs have been emitted. It is ideal for this diamond case where you have source
generating stepOne
and stepTwo
and you want to combine stepOne
and stepTwo
.
In diagrams, http://rxmarbles.com/#zip.
Keep in mind that zip
assumes the inputs have the same frequency of emissions. In other cases, you might want to combine items from stepOne with stepTwo when they have a different frequency of emissions. Then you need to use combineLatest
.