Search code examples
rxjsreactivedeclarativeangular-signals

How to Retrieve an Array at the End of an Observable Without Using toArray() in Angular/RxJS?


I have an observable enriched_sns$ which I'm enriching by performing operations on each emitted item using concatMap, and then combining the results into an array using toArray(). However, I'm curious if there's an alternative approach to achieve the same result without using toArray().

Here's my current code:

private enriched_sns$ = toObservable(this.sn_results).pipe(
  concatMap((results) => results as StreamNotificationResult[]),
  concatMap((notification) => {
    const user_id = notification.activities[0].actor.id;
    return zip(of(notification), this.read_user_data_service.readUserData(user_id));
  }),
  map(([notification, user]) => {
    return {
      notification,
      user: user,
    };
  }),
  toArray(),
);

Is there a way to obtain an array of emitted items at the end of the observable without resorting to toArray()? Any help or alternative approaches would be greatly appreciated. Thank you!


Solution

  • The reason you need to use toArray() in your example is because you are taking the emitted array from your signal and then emitting each element individually.

    You could use the forkJoin function to handle subscribing to an array of observables, rather than using concatMap to emit each element one at a time:

    private enriched_sns$ = toObservable(this.sn_results).pipe(
      concatMap((results: StreamNotificationResult[]) => forkJoin(
        results.map(n => this.read_user_data_service.readUserData(n.activities[0].actor.id).pipe(
          map(user => ({ notification: n, user }))
        ))
      )),
    );
    

    There is one important difference forkJoin will subscribe to all observables at the same time, not one at a time. If you need to limit concurrency, you could check out the rxjs-etc package that has a forkJoinConcurrent function which allows you to specify concurrency:

    private enriched_sns$ = toObservable(this.sn_results).pipe(
      concatMap((results: StreamNotificationResult[]) => forkJoinConcurrent(
        results.map(n=> this.read_user_data_service.readUserData(n.activities[0].actor.id).pipe(
          map(user => ({ notification: n, user }))
        ))
      ), 1), // <-- limit to one inner subscription at a time
    );
    

    Note, use of zip/of is not necessary at all. You can simply add a .pipe to your inner observable and map the emission to your desired shape:

    private enriched_sns$ = toObservable(this.sn_results).pipe(
      concatMap((results: StreamNotificationResult[]) => results),
      concatMap(notification => {
        const user_id = notification.activities[0].actor.id;
        return this.read_user_data_service.readUserData(user_id).pipe(
          map(user => ({ notification, user }))
        );
      }),
      toArray(),
    );