Search code examples
rxjsrxjs7

How to combine 2 streams which both originate from the same source


I would like to split a single RxJs stream into two streams and perform two different async tasks and finally merge those 2 streams back into one. The best solution I have so far is:

import { fromEvent, debounceTime, merge } from 'rxjs';

// SOURCE stream
const clicks = fromEvent(document, 'click').pipe(share());

const stream$ = merge(
    clicks.pipe( // stream 1
        debounceTime(1000),
        map((x) => 'debounceA')
    ),
    clicks.pipe( // stream 2
        debounceTime(2000),
        map((x) => 'debounceB')
    )
);

stream$.subscribe((x) => console.log(x)); 

// OUTPUT:
//   debounceA (after 1 second)
//   debounceB (after 2 seconds)

Now, what I'm wondering about, is this the best solution as I have now clicks.pipe twice in my code. I've added share to make sure that code (not in this example) runs twice. But I was thinking, is it possible with RxJs to somehow swap the clicks.pipe and the merge, something like this:

...
const stream$ = clicks.pipe(
    merge(
      debounceTime(1000).pipe(map((x) => 'debounceA')),
      debounceTime(2000).pipe(map((x) => 'debounceB'))
   )
);
...

Although this doesn't work it somehow makes more sense I think. Is something like this possible or are there better RxJs ways to do what I showed here?

DEMO


Solution

  • If I understand the problem right, what you do not like is the duplication of the logic within clicks.pipe(...) block.

    If this is the case, I would use an higher level function like this

    function streamFactory(cliks: Observable<any>, debTime: number, msg: string) {
        return clicks.pipe( // stream 1
            debounceTime(debTime),
            map((x) => msg)
        )
    }
    
    const stream$ = merge(
        streamFactory(clicks, 1000, 'debounceA'),
        streamFactory(clicks, 2000, 'debounceB'),
    );
    

    DEMO