I would like to split a single RxJs stream into two streams and perform two different async tasks and finally merge those 2 streams back into one. The best solution I have so far is:
import { fromEvent, debounceTime, merge } from 'rxjs';
// SOURCE stream
const clicks = fromEvent(document, 'click').pipe(share());
const stream$ = merge(
clicks.pipe( // stream 1
debounceTime(1000),
map((x) => 'debounceA')
),
clicks.pipe( // stream 2
debounceTime(2000),
map((x) => 'debounceB')
)
);
stream$.subscribe((x) => console.log(x));
// OUTPUT:
// debounceA (after 1 second)
// debounceB (after 2 seconds)
Now, what I'm wondering about, is this the best solution as I have now clicks.pipe
twice in my code. I've added share
to make sure that code (not in this example) runs twice. But I was thinking, is it possible with RxJs to somehow swap the clicks.pipe
and the merge
, something like this:
...
const stream$ = clicks.pipe(
merge(
debounceTime(1000).pipe(map((x) => 'debounceA')),
debounceTime(2000).pipe(map((x) => 'debounceB'))
)
);
...
Although this doesn't work it somehow makes more sense I think. Is something like this possible or are there better RxJs ways to do what I showed here?
If I understand the problem right, what you do not like is the duplication of the logic within clicks.pipe(...)
block.
If this is the case, I would use an higher level function like this
function streamFactory(cliks: Observable<any>, debTime: number, msg: string) {
return clicks.pipe( // stream 1
debounceTime(debTime),
map((x) => msg)
)
}
const stream$ = merge(
streamFactory(clicks, 1000, 'debounceA'),
streamFactory(clicks, 2000, 'debounceB'),
);