Search code examples
angulartypescriptrxjsobservable

RxJS Observable append new emitted values to existing ones instead of replacing them


What I'm trying to achieve is having an Observable that adds more data to it every time a new event is triggered. The scenario is as follows:

Say we have an elementsService with getElements(pageNo: number) method that does an http call to retrieve some elements.

  1. On page load, have an observable that retrieves the first page.
@Component({
  template: '<div>{{ elements$ | async }}</div>'
})
export class MyComponent {
  page: BehaviorSubject<number> = new BehaviorSubject(0);
  elements$: Observable<any> = this.page.pipe(
    mergeMap((page) => _service.getElements(page))
  );

  constructor(private _service: Service) {}
  // ...
}

Assume first result is elements = [ e1, e2 ]

  1. After some time, a new page event is emitted
export class MyComponent {
  // ...
  onPageChange(pageNo: number): void {
    this.page.next(pageNo);
  }
}

The next page will result in elements = [ e3, e4 ], which means the original values e1 and e2 are gone. How can I change this such that elements e3 and e4 are appended to the previous Observable result instead of replacing them?


Solution

  • You can use the scan operator to accumulate results like this:

      elements$: Observable<any> = this.page.pipe(
        mergeMap((page) => _service.getElements(page)),
        scan((all, elements) => all.concat(elements), [])
      );
    

    Each time scan receives an emission, it will run the function and emit the result.

    (all, elements) => all.concat(elements)
    
    • elements is the incoming emission (received from the _service.getElements() call).

    • all is the previously computed value. (or the "seed value" for the initial run)