Search code examples
angularrxjsrxjs-pipeable-operators

filter out observableB if observalbeA get triggered 1s before


Let's say I have two observables, observableA and observableB.

There are 3 cases for how these two observables can be triggered:

  1. Only observableA gets triggered.
  2. Only observableB gets triggered.
  3. ObservableA gets triggered first, followed by observableB after a 1-second delay.

How can I handle these two observables so that the callback function won't execute twice?

expecting: the callback function won't execute twice


Solution

  • So what's not said is that it's ok to wait to see if B is going to trigger after an A. The only way to solve this is have a window in which after A triggers you wait X seconds then trigger to continue with A, or you receive a B and go with that logic. bufferTime can serve as this. First we merge A and B together then we buffer those 2 observables up.

    import { interval, bufferTime, merge, map, switchMap, timer } from 'rxjs';
    
    const obsA = interval(1000)
                 .pipe( switchMap( (i) => timer( Math.random() * 1000 ) ) )
                 .pipe( map( (x) => `A`) )
    const obsB = interval(1000)
                 .pipe( switchMap( (i) => timer( Math.random() * 1000 ) ) )
                 .pipe( map((y) => `B`))
    
    merge( 
      obsA, 
      obsB 
    ).pipe( bufferTime(1000) )
    .subscribe(x => {
      if( x.length === 0 ) return;
      if( x.every( (item) => item.startsWith("A") ) ) {
        console.log("Only A", x);
      } else if( x.findIndex( (item) => item.startsWith("B") ) === 0 ) {
        console.log("Only B", x);
      } else {
        console.log("A followed by B", x);
      }
    })
    

    You have to handle the case where neither A nor B fire which will be an empty array, and just ignore it.