I'm trying to implement a togglable auto-save feature using RxJS streams. The goal is to:
Here is what I came across with:
autoSave$ = new BehaviorSubject(true);
change$ = new Subject();
change$.pipe(
bufferToggle(
autoSave$.pipe(filter(autoSave => autoSave === false)),
() => autoSave$.pipe(filter(autoSave => autoSave === true)),
),
concatMap(changes => changes),
concatMap(change => apiService.patch(change)),
).subscribe(
() => console.log('Change sent'),
(error) => console.error(error),
);
Thanks to bufferToggle
, I'm able to buffer the changes while autoSave
is off and send them when it's re-enabled.
Problem is that while autoSave
is enabled, nothing passes through. I understand it's because bufferToggle
ignores the flow coming while its opening observable doesn't emit.
I feel that I should have a condition there to bypass the bufferToggle
while autoSave
is enabled, but all my attempts miserably failed.
Any idea to achieve this?
We can buffer events in-between autosave on and off using bufferToggle(on, off)
, and open a filtering window between off and on using windowToggle(off, on)
. And then we merge those together:
const on$ = autoSave$.filter(v=>v);
const off$ = autoSave$.filter(v=>!v);
const output$ =
Observable.merge(
changes$
.bufferToggle(
off$,
()=>on$
)
changes$
.windowToggle(
on$,
()=>off$
)
)
.flatMap(x=>x) // < flattern buffer and window
Play with this example at https://thinkrx.io/gist/3d5161fc29b8b48194f54032fb6d2363
* Please, note that since buffer wraps values in Array — I've used another flatMap(v=>v)
in the example to unwrap buffered values. You might want to disable this particular line to get arrays from buffers mixed with raw values.
Also, check my article "Pausable Observables in RxJS" to see more examples.
Hope this helps