I'm looking for a way to make streams that are combined Note: this is the simplest form of my problem, in reality I'm combining 8 different streams some are intertwined, some are async etc :(
import { BehaviorSubject, map, combineLatest } from 'rxjs';
const $A = new BehaviorSubject(1)
const $B = $A.pipe(map(val => `$B : ${val}`))
const $C = $A.pipe(map(val => `$C : ${val}`))
// prints out:
// (1) [1, "$B : 1", "$C : 1"]
combineLatest([$A,$B,$C]).subscribe(console.log)
$A.next(2)
// prints out:
// (2) [2, "$B : 1", "$C : 1"]
// (3) [2, "$B : 2", "$C : 1"]
// (4) [2, "$B : 2", "$C : 2"]
The print out (1) is great, all streams have a value of "1": [1, "$B : 1", "$C : 1"] The print out (4) is great, all streams have a value of "2": [2, "$B : 2", "$C : 2"]
But the combine latest fires for (2) and (3) after each stream is updated individually meaning that you have a mixture of "1" and "2"
**What way can I modify the code to only get notified when a change has fully propgaged? **
My best solutions so far: A) using debouceTime(100)
combineLatest([$A,$B,$C]).pipe(debounceTime(100)).subscribe(console.log)
But it's flaky because it can either swallow valid states if the are process to quickly or notify with invalid states if individual pipes are too slow
B) filter only valid state
combineLatest([$A,$B,$C]).pipe(
filter(([a,b,c])=>{
return b.indexOf(a) > -1 && c.indexOf(a) > -1
})
).subscribe(console.log)
works but adding a validation function seems like the wrong way to do it (and more work :))
C) Make B$ and C$ in which we push the latest and reset at every change"
A$.pipe(tap(val)=>{
B$.next(undefined);
B$.next(val);
C$.next(undefined)
C$.next(val);
})
...
combineLatest([$A,$B.pipe(filter(b => !!b)),$C.pipe(filter(c => !!c))]).pipe(
filter(([a,b,c])=>{
return b.indexOf(a) > -1 && c.indexOf(a) > -1
})
Works but quite a lot of extra code and vars
I have the feeling I'm missing a concept or not seeing how to achieve this in a clean/robust way, but I sure I'm not the first one :)
Thanks
As you've observed, the observable created by combineLatest
will emit when any of its sources emit.
Your problem is occurring because you pass multiple observables into combineLatest
that share a common source. So whenever that common source emits, it causes each derived observable to emit.
One way to "fix" this in a synchronous scenario is to simply apply debounceTime(0)
which will mask the duplicate emission that happens in the same event loop. This approach is a bit naive, but works in simple scenarios:
combineLatest([$A,$B,$C]).pipe(
debounceTime(0)
)
But, since you have some async things going on, I think your solution is to not include duplicate sources inside combineLatest
and handle the logic further down the chain:
combineLatest([$A]).pipe(
map(([val]) => [
val,
`$B : ${val}`,
`$C : ${val}`,
])
)
The code above produces the desired output. Obviously, you wouldn't need combineLatest with a single source, but the idea is the same if you had multiple sources.
Let's use a more concrete example that has the same issue:
const userId$ = new ReplaySubject<string>(1);
const maxMsgCount$ = new BehaviorSubject(2);
const details$ = userId$.pipe(switchMap(id => getDetails(id)));
const messages$ = combineLatest([userId$, maxMsgCount$]).pipe(
switchMap(([id, max]) => getMessages(id, max))
);
const user$ = combineLatest([userId$, details$, messages$]).pipe(
map(([id, details, messages]) => ({
id,
age: details.age,
name: details.name,
messages
}))
);
Notice when userId
emits a new value, the user$
observable would end up emitting values that had the new userId, but the details from the old user!
We can prevent this by only including unique sources in our combineLatest
:
const userId$ = new ReplaySubject<string>(1);
const maxMsgCount$ = new BehaviorSubject(2);
const user$ = combineLatest([userId$, maxMsgCount$]).pipe(
switchMap(([id, max]) => combineLatest([getDetails(id), getMessages(id, max)]).pipe(
map(([details, messages]) => ({
id,
age: details.age,
name: details.name,
messages
}))
))
);
You can see this behavior in action in the below stackblitz samples: