I'm currently trying to implement a simple offline/online sync mechanism using observables.
Basically, I have two observables:
What I want to achieve is to combine the above observables so that:
false
, the combined observable shouldn't emit. In this case, the data observable should retain its statetrue
, the combined observable should emit every time there is data in the data observablefalse
to true
, it should emit for every value on the data observableA small example that currently uses filter and combineLatest can be found here: https://codesandbox.io/s/offline-sync-s5lv49?file=/src/index.js
Unfortunately, this doesn't behave as intended at all.
Is there any operator to achieve the required behavior? As an alternative, I could maybe poll the connection status of course and emit every X seconds. But ideally, I'd like a clean combination of Observables, though I'm a bit lost with what operator makes most sense.
To clear the idea up: I need to sync all data, not just the latest. So the data observable should buffer the data.
After also searching for the term "gate", I found the following stack overflow question and post: Conditional emission delays with rxjs
Basically, the answer is using delayWhen to achieve the desired result.
I've updated an example here: https://codesandbox.io/s/offline-sync-experiments-nimoox?file=/src/index.js:0-1357
The crucial part is:
const offlineOnlineSubject = new BehaviorSubject(false);
const dataSubject = new Subject();
const triggerFn = (_) => offlineOnlineSubject.pipe(filter((v) => v));
dataSubject.pipe(delayWhen(triggerFn)).subscribe((counter) => {
console.log("Syncing data", {
counter
});
syncedIndicator.innerHTML += `<li>${counter}</li>`;
});
Wrapped in a custom typescript operator:
import { MonoTypeOperatorFunction, Observable } from 'rxjs';
import { delayWhen, filter } from 'rxjs/operators';
export function gate<T>(gateTrigger: Observable<boolean>): MonoTypeOperatorFunction<T> {
const gateTriggerFn = () => gateTrigger.pipe(
filter((v) => v)
);
return (source: Observable<T | null | undefined>) => source.pipe(
delayWhen(gateTriggerFn)
);
}
It seems so far that this solution is doing what I intend it to do.