Search code examples
angularrxjsobservable

RxJS: Combine two observables so the observable only emits if the value of the first observable is true


I'm currently trying to implement a simple offline/online sync mechanism using observables.

Basically, I have two observables:

  1. Connection observable: The first observable gives me the information whether or not there is an internet connections. It emits when the network state changes
  2. Data observable: The second observable has the data that needs to synced. It emits when there is new data to be synced

What I want to achieve is to combine the above observables so that:

  • As long as the connection state is false, the combined observable shouldn't emit. In this case, the data observable should retain its state
  • As long as the connection state is true, the combined observable should emit every time there is data in the data observable
  • If the connection state switches from false to true, it should emit for every value on the data observable

A small example that currently uses filter and combineLatest can be found here: https://codesandbox.io/s/offline-sync-s5lv49?file=/src/index.js

Unfortunately, this doesn't behave as intended at all.

Is there any operator to achieve the required behavior? As an alternative, I could maybe poll the connection status of course and emit every X seconds. But ideally, I'd like a clean combination of Observables, though I'm a bit lost with what operator makes most sense.

To clear the idea up: I need to sync all data, not just the latest. So the data observable should buffer the data.


Solution

  • After also searching for the term "gate", I found the following stack overflow question and post: Conditional emission delays with rxjs

    Basically, the answer is using delayWhen to achieve the desired result.

    I've updated an example here: https://codesandbox.io/s/offline-sync-experiments-nimoox?file=/src/index.js:0-1357

    The crucial part is:

    const offlineOnlineSubject = new BehaviorSubject(false);
    const dataSubject = new Subject();
    
    const triggerFn = (_) => offlineOnlineSubject.pipe(filter((v) => v));
    
    dataSubject.pipe(delayWhen(triggerFn)).subscribe((counter) => {
      console.log("Syncing data", {
        counter
      });
    
      syncedIndicator.innerHTML += `<li>${counter}</li>`;
    });
    

    Wrapped in a custom typescript operator:

    import { MonoTypeOperatorFunction, Observable } from 'rxjs';
    import { delayWhen, filter } from 'rxjs/operators';
    
    export function gate<T>(gateTrigger: Observable<boolean>): MonoTypeOperatorFunction<T> {
      const gateTriggerFn = () => gateTrigger.pipe(
        filter((v) => v)
      );
    
      return (source: Observable<T | null | undefined>) => source.pipe(
        delayWhen(gateTriggerFn)
      );
    }
    

    It seems so far that this solution is doing what I intend it to do.