Search code examples
angulartypescriptrxjsobservablengrx

rxjs: merge 2 streams, complete stream when a specific input emits


I'm writing an Angular app and setting up a system where an NGRX effect will request data to a service.

Underlyingly, this service will do two things:

  • Check local cache (sqlite) for the requested data, and
  • Contact the API to get the most up to date data

I would like to define an observable response of this service which would emit either only the API's response (in case it was first/local cache miss) or local and then API in that order. I've visualised these 2 scenarios below:

Scenario 1 (API is first/cache miss)

API   ---1
CACHE ---------2
MERGE ---1

Scenario 2 (Cache hit)

API   ---------1
CACHE ---2
MERGE ---2-----1

I'm thinking of creating a Subject, then firing off both calls and completing the Subject when the API responds (either successfully or with an error).

Pseudo code:

const subject = new Subject();

this.cache.getData().then(res => subject.next(res)).catch(err => subject.error(err));
this.api.getData().then(res => subject.next(res)).catch(err => subject.error(err)).finally(() => subject.complete());

return subject.asObservable();

But I'm wondering if there's a cleaner solution to what I'm trying to achieve here? Any input greatly appreciated.


Solution

  • If I understand correctly, you want to fire two calls, A) API, B) Cache. If A returns first emit that result, but not response from B. If B returns first, emit that, but then also emit response from A when it arrives.

    Essentially, you always want response from A. You only want response from B if it is received before A.

    If that's the case you could define two source observables and merge them.

    private apiData$ = defer(() => this.api.getData()).pipe(share());
    
    private cachedData$ = defer(() => this.cache.getData()).pipe(
      filter(existing => !!existing),
      takeUntil(apiData$)
    );
    
    data$ = merge(this.cachedData$, this.apiData$);
    

    Here we use defer to turn your Promise into Observable upon subscription.

    apiData$ is using share() because we are going to end up with two subscriptions and we don't want fire separate calls for each subscription.

    cachedData$ uses filter to suppress emissions you don't want (when data doesn't exist in cache).

    takeUntil is used to complete the cachedData$ observable when api$ emits any value, therefore cachedData$ will never emit after apiData$

    Finally, merge is used to combine both sources into a single observable, so data$ will emit whenever either source emits.


    Here's a StackBlitz.