Search code examples
angularrxjsreactivex

RxJs: buffer events when condition is true, pass events through when condition is false


I created the Observable constructor below that works as described. Does anyone know if there is a more concise way of achieving the same behaviour using the operators that come with RxJs? I was looking at bufferToggle which is close to the required behaviour, but I need the emitted values to be passed through when the buffer is closed.

Function Description: Buffers the emitted source values if the condition emits true, and passes through the emitted source values if the condition emits false. If the condition emits false after being true, the buffer releases each value in the order that they were received. The buffer is initialised to pass through the emitted source values until the condition emits true.

function bufferIf<T>(condition: Observable<boolean>, source: Observable<T>): Observable<T> {
  return new Observable<T>(subscriber => {
    const subscriptions: Subscription[] = [];
    const buffer = [];
    let isBufferOpen = false;

    subscriptions.push(
      // handle source events
      source.subscribe(value => {
        // if buffer is open, or closed but buffer is still being 
        // emptied from previously being closed.
        if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
          buffer.push(value);

        } else {
          subscriber.next(value);
        }
      }),

      // handle condition events
      condition.do(value => isBufferOpen = value)
        .filter(value => !value)
        .subscribe(value => {
          while (buffer.length > 0 && !isBufferOpen) {
            subscriber.next(buffer.shift());
          }
        })
    );

    // on unsubscribe
    return () => {
      subscriptions.forEach(sub => sub.unsubscribe());
    };
  });
}

Edit

In response to comment, the following is the same function as the one above but in the form of an RxJs Operator and updated to use RxJx 6+ pipeabale Operators:

 function bufferIf<T>(condition: Observable<boolean>): MonoTypeOperatorFunction<T> {
   return (source: Observable<T>) => {
     return new Observable<T>(subscriber => {
       const subscriptions: Subscription[] = [];
       const buffer: T[] = [];
       let isBufferOpen = false;
   
       subscriptions.push(
         // handle source events
         source.subscribe(value => {
           // if buffer is open, or closed but buffer is still being 
           // emptied from previously being closed.
           if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
             buffer.push(value);
           } else {
             subscriber.next(value);
           }
         }),
   
         // handle condition events
         condition.pipe(
          tap(con => isBufferOpen = con),
          filter(() => !isBufferOpen)
         ).subscribe(() => {
          while (buffer.length > 0 && !isBufferOpen) {
            subscriber.next(buffer.shift());
          }
        })
       );
   
       // on unsubscribe
       return () => subscriptions.forEach(sub => sub.unsubscribe());
     });
   }
}

Solution

  • I've found a solution based on operators rather than subscriptions, but hesitate to call it more concise.

    Note, the endToken can be removed if it's possible to guarantee the buffer on/off stream always ends with an off (i.e an odd number of emits).

    console.clear() 
    const Observable = Rx.Observable
    
    // Source and buffering observables
    const source$ = Observable.timer(0, 200).take(15)
    const bufferIt$ = Observable.timer(0, 500).map(x => x % 2 !== 0).take(6)  
    
    // Function to switch buffering
    const endToken = 'end'            
    const bufferScanner = { buffering: false, value: null, buffer: [] }
    const bufferSwitch = (scanner, [src, buffering]) => { 
      const onBufferClose = (scanner.buffering && !buffering) || (src === endToken)
      const buffer = (buffering || onBufferClose) ? scanner.buffer.concat(src) : []
      const value = onBufferClose ? buffer : buffering ? null : [src]
      return { buffering, value, buffer }
    }
          
    // Operator chain
    const output = 
      source$
        .concat(Observable.of(endToken))     // signal last buffer to emit
        .withLatestFrom(bufferIt$)           // add buffering flag to stream
        .scan(bufferSwitch, bufferScanner)   // turn buffering on and off
        .map(x => x.value)                   // deconsruct bufferScanner
        .filter(x => x)                      // ignore null values
        .mergeAll()                          // deconstruct buffer array
        .filter(x => x !== endToken)         // ignore endToken
    
    // Proof
    const start = new Date()
    const outputDisplay = output.timestamp()
      .map(x => 'value: ' + x.value + ', elapsed: ' + (x.timestamp - start) )
    const bufferDisplay = bufferIt$.timestamp()
      .map(x => (x.value ? 'buffer on' : 'buffer off') + ', elapsed: ' + (x.timestamp - start) )
    bufferDisplay.merge(outputDisplay)
      .subscribe(console.log)
      
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

    Footnote

    I also found a solution based on buffer(), but am not convinced it's stable with a high-frequency source. There seems to be something hokey with certain buffer configurations (i.e the declaration looks sound but tests show occasional delays which interfere with buffer operation).

    In any case, for reference,

    /* 
      Alternate with buffered and unbuffered streams
    */
    
    const buffered = 
       source$.withLatestFrom(bufferIt$)      
        .filter(([x, bufferIsOn]) => bufferIsOn)  
        .map(x => x[0])
        .buffer(bufferIt$.filter(x => !x))
        .filter(x => x.length)       // filter out empty buffers
        .mergeAll()                  // unwind the buffer
    
    const unbuffered =
      source$.withLatestFrom(bufferIt$)      
        .filter(([x, bufferIsOn]) => !bufferIsOn)    
        .map(x => x[0])
    
    const output = buffered.merge(unbuffered)