Search code examples
angulartypescriptrxjsobservable

Combine arrays from observables without waiting for all to complete


In my application I retrieve data from a remote server at a definite interval. Data is formatted as an array of objects and displayed in the HTML template via the angular async pipe.
I am using shareReplay() to provide caching functionality:

  private refresherToggleSales$: Subject<void>;
  private displayLastSales$: Observable<Sale[]>;

  public InjectSale$: Subject<Sale[]>;
  
  [...]

   get LastSales() : Observable<Sale[]> {
    if (!this.displayLastSales$) {
      const rTimer$ = timer(0, DefaultRefresh);

      this.displayLastSales$ = rTimer$.pipe(
        switchMap(_ => this.getLastSales()),
        shareReplay(this.CACHE_SIZE),
        takeUntil(this.refresherToggleSales$)
      );
    }

    return merge(this.displayLastSales$, this.InjectSale$);
  }

  private getLastSales() : Observable<Sale[]> {
    // get data from server
  }

Whenever a sale is manually input in the application, I would like for it to be displayed immediately without having to wait for it to come from the server.

However if I call InjectLastSale$.next() with a 1-element array it will completely replace all previous data.
How can I combine the two streams (without having one to wait on the other to complete)?


Solution

  • You're close! Just change up where you place your merge.

    get LastSales() : Observable<Sale[]> {
        
      if (!this.displayLastSales$) {
        this.displayLastSales$ = timer(0, DefaultRefresh).pipe(
          switchMap(_ => this.getLastSales().pipe(
            mergeWith(this.InjectSale$),
            scan((acc, vals) => [...vals, ...acc], <Sale[]>[])
          )),
          shareReplay(this.CACHE_SIZE),
          takeUntil(this.refresherToggleSales$)
        );
      }
    
      return this.displayLastSales$;
    }