Search code examples
rxjsrxjs6

Pausable buffer with RxJS


I'm trying to implement a togglable auto-save feature using RxJS streams. The goal is to:

  • While auto-save is enabled, send changes to the server as they come.
  • While auto-save is disabled, buffer the changes and send them to the server when auto-save is re-enabled.

Here is what I came across with:

autoSave$ = new BehaviorSubject(true);
change$ = new Subject();

change$.pipe(
  bufferToggle(
    autoSave$.pipe(filter(autoSave => autoSave === false)),
    () => autoSave$.pipe(filter(autoSave => autoSave === true)),
  ),
  concatMap(changes => changes),
  concatMap(change => apiService.patch(change)),
).subscribe(
  () => console.log('Change sent'),
  (error) => console.error(error),
);

Thanks to bufferToggle, I'm able to buffer the changes while autoSave is off and send them when it's re-enabled.

Problem is that while autoSave is enabled, nothing passes through. I understand it's because bufferToggle ignores the flow coming while its opening observable doesn't emit.

I feel that I should have a condition there to bypass the bufferToggle while autoSave is enabled, but all my attempts miserably failed.

Any idea to achieve this?


Solution

  • We can buffer events in-between autosave on and off using bufferToggle(on, off), and open a filtering window between off and on using windowToggle(off, on). And then we merge those together:

    pausable buffer with bufferToggle and windowToggle

    const on$ = autoSave$.filter(v=>v);
    const off$ = autoSave$.filter(v=>!v);
    
    const output$ =
      Observable.merge(
        changes$
          .bufferToggle(
            off$,
            ()=>on$
          )
    
        changes$
          .windowToggle(
            on$,
            ()=>off$
          )
      )
      .flatMap(x=>x) // < flattern buffer and window
    

    Play with this example at https://thinkrx.io/gist/3d5161fc29b8b48194f54032fb6d2363

    * Please, note that since buffer wraps values in Array — I've used another flatMap(v=>v) in the example to unwrap buffered values. You might want to disable this particular line to get arrays from buffers mixed with raw values.

    Also, check my article "Pausable Observables in RxJS" to see more examples.

    Hope this helps