Search code examples
angulartypescriptrxjsngrx

Ngrx, Rxjs - Merging observable streams returning a new stream


I am trying to join two observable streams to create a new one I can subscribe too. Both of the input streams return arrays of objects. I tried using the forkJoin operator followed by a pipe map and subscribe like so:

forkJoin(
  this.firstObservable$,
  this.secondObservable$
).pipe(
  map(([first, second]) => {
    // here create a new array array = [] and push new Object { first.something, second.something
    // return newArray
  }
  })
).subscribe((data: any) => {
  this.data = data;
});

But that doesn't seem to work as expected. I also tried this with tap. What would be the proper way?


Solution

  • Rxjs provides several operators for combining streams (you can find list of this operators here), in your case I can recommend you use combineLatest or forkJoin, which to use depend on type of input observables

    forkJoin - When all observables complete, emit the last emitted value from each.

    combileLatest - When any observable emits a value, emit the latest value from each

    notice: ngrx selectors not completes, if your input observables it's ngrx selectors, use combineLatest