Search code examples
javascriptangularrxjs

Rxjs do something on first emit from multiple subscriptions


Is there a clean way to do something on first emit from multiple subscriptions ?

e.g.:

this.subscription1 = this.service.getData1().subscribe(data => {
    this.data1 = data;
    console.log('1');
});

this.subscription2 = this.service.getData2().subscribe(data => {
    this.data2 = data2;
    console.log('2');
});

// Do something after first emit from subscription1 AND subscription2
doSomething();

...

doSomething() {
    console.log('Hello world !');
}

Output goal:

1
2
Hello world !
1
2
1
1
2
1
2
2
...

Solution

  • There've multiple times where I also needed such a isFirst operator that'll run some predicate only for the first emission. I've slapped together a quick custom operator that uses a single state variable first to decide if the emission is indeed first and run some predicate using the tap operator.

    Since it uses tap internally it does not modify the source emission in any way. It only runs the passed predicate when the emission is indeed first.

    Try the following

    isFirst() operator

    export const isFirst = (predicate: any) => {
      let first = true;
      return <T>(source: Observable<T>) => {
        return source.pipe(
          tap({
            next: _ => {
              if (first) {
                predicate();
                first = false;
              }
            }
          })
        );
      };
    };
    

    For combining multiple streams that will be triggered when any of the source emits, you could use RxJS combineLatest function.

    Example

    import { Component } from "@angular/core";
    
    import { timer, Observable, Subject, combineLatest } from "rxjs";
    import { tap, takeUntil } from "rxjs/operators";
    
    @Component({
      selector: "my-app",
      template: `<button (mouseup)="stop$.next()">Stop</button>`
    })
    export class AppComponent {
      stop$ = new Subject<any>();
    
      constructor() {
        combineLatest(timer(2000, 1000), timer(3000, 500))
          .pipe(
            isFirst(_ => {
              console.log("first");
            }),
            takeUntil(this.stop$)
          )
          .subscribe({
            next: r => console.log("inside subscription:", r)
          });
      }
    }
    

    Working example: Stackblitz

    In your case it might look something like

    
    this.subscription = combineLatest(
      this.service.getData1().pipe(
        tap({
          next: data => {
            this.data1 = data;
            console.log('1');
          }
        })
      ), 
      this.service.getData2().pipe(
        tap({
          next: data => {
            this.data2 = data;
            console.log('2');
          }
        })
      )
    ).pipe(
      isFirst(_ => { 
        console.log("first"); 
      })
    ).subscribe({
      next: r => console.log("inside subscription:", r)
    });