Search code examples
rxjsrxjs-observablesconcatmapmergemap

RxJS: execute concatMap i parallel


Is it possible to execute a high-order observable in parallel, but still preserve the order when merging the results?

I have something looking like this:

invoker$: Observable<void>;
fetch: (index: number) => Observable<T[]>;

invoker$
  .pipe(
    concatMap((_, index) => fetch(index)),
    scan((acc: T[], values) => [...acc, ...values], [])
  )
  .subscribe(/* Do something with the array */);

The idea is having an observable that invokes a callback (e.g. backend call that takes a considerable amount of time) generating a new observable that emits a single value (array of some generic type). The returned values should be concatenated in another array while preserve their original fetch order.

I would, however, like the requests to be fired in parallel. So if the invoker$ is called rapidly, the requests are made in parallel and the results are merged as they complete.

My understanding is that the concatMap will wait for one observable to complete, before starting the next one. mergeMap will do it parallel, but won't do anything to preserve the order.


Solution

  • Seems like this behavior is provided by the concatMapEager operator from the cartant/rxjs-etc library - written by Nicholas Jamieson (cartant) who's a developer on the core RxJS team.