Search code examples
javascripttypescriptrxjsreactive-programmingreactivex

Run action once per Observable's value emission


Suppose we have the following code (Stackblitz):

  public counter: number = 0; // Some arbitrary global state

  public ngOnInit(): void {
    const subj: Subject<number> = new Subject<number>();

    const obs: Observable<number> = subj.asObservable().pipe(
      tap((): void => {
        ++this.counter;
      }));

    // this.counter increments for each subscription.
    // But supposed to increment once per obs new value emition.
    obs.subscribe();
    obs.subscribe();
    obs.subscribe();

    subj.next(1);

    // Now this.counter === 3
  }

The result is that this.counter equals 3 after executing the above, it has been incremented one time per each subscription.

But how can we increment it once per Observable's new value emission in a Reactive way?

I know that we should avoid state outside the Reactive flow, but still it is needed time to time in real world apps.


Solution

  • The reason why you see number 3 is a little different. When you use of(1) it emits one next notification and right after that the complete notification. It does this every time you call obs.subscribe().

    So if you want to see 1 you need make an intermediate Subject where you make all subscriptions and then this intermediate Subject subscribe to the source Observable (with publish() operator most easily that creates the Subject for you):

    const obs: ConnectableObservable<number> = of(1).pipe(
      tap((): void => {
        ++this.counter;
      }),
      publish(),
    ) as ConnectableObservable<number>;
    
    obs.subscribe();
    obs.subscribe();
    obs.subscribe();
    
    obs.connect();
    

    Your updated demo: https://stackblitz.com/edit/rxjs-update-global-state-ejdkie?file=app/app.component.ts

    Be aware that of(1) will emit complete notification anyway which means that the subject inside publish() will complete as well. This might (or might not) be a problem for you but it really depends on your use-case.

    Edit: This is updated demo where source is Subject: https://stackblitz.com/edit/rxjs-update-global-state-bws36m?file=app/app.component.ts