Search code examples
javascriptrxjsrxjs5reactive-extensions-js

RxJS - Buffer unique values and emit when no other values have been emitted for x seconds?


A user is typing values in a form and an event is emitted every time a user edits a particular field, with the value being the field they edited.

For example a user typing 3 times into the description field, followed by two times in the name field, would look like

"description" => "description" => "description" => "name" => "name" => ...

I want to buffer unique values and emit them as an array when the user stops typing for x amount of seconds. A value may reappear again in a different buffer window.

Essentially this is to track which fields were updated when the user stopped typing and communicate with the server to save the edited values.


I have this so far which emits every 3000 ms, plus it doesn't prevent duplicates when buffering but instead we "deduplicate" the array afterwards.

   this.update$
      .bufferTime(3000)
      .filter(buffer => buffer.length > 0)
      .map(buffer => [...new Set(buffer)])
      .subscribe(x => console.log(x));

So it should listen until a value is emitted and then buffer unique values until no more values have been emitted for x seconds, then emit the buffer and repeat. How can one achieve this?


Solution

  • This could be an alternate version:

    const { Observable } = Rx;
    
    const log = (prefix) => (...args) => { console.log(prefix, ...args); };
    
    const inputs = document.querySelectorAll('input');
    
    const updates$ = Observable
      .fromEvent(inputs, 'input')
      .pluck('target', 'id')
    ;
    
    // wait x ms after last update
    const flush$ = updates$.debounceTime(3000);
    
    const buffered$ = updates$
      // use `distinct` without `keySelector`, but reset with flush$
      .distinct(null, flush$)
      
      // flush the buffer using flush$ as `notifier`
      .buffer(flush$)
    ;
    
    buffered$.subscribe(log('buffered$ =>'));
    <script src="https://unpkg.com/@reactivex/rxjs@^5/dist/global/Rx.min.js"></script>
    
    <div><input type="text" placeholder="foo" id="input.foo"></div>
    <div><input type="text" placeholder="bar" id="input.bar"></div>
    <div><input type="text" placeholder="baz" id="input.baz"></div>