Search code examples
rxjsngrx

RXJS Wait for All Observables to Complete and Return Results


I'm trying to create a RX stream that will execute a list of XHR calls async and then wait for them to complete before going to the next call.

To help explain this could be written like this in normal JS:

try {
    await* [
        ...requests.map(r => angularHttpService.get(`/foo/bar/${r}`))
    ];
} catch(e) { throw e }

// do something

This is the code I was trying but its running them individually and not waiting for them all to complete before proceeding. (This is a NGRX Effect stream so it is slightly different from vanilla rx).

mergeMap(
        () => this.requests, concatMap((resqests) => from(resqests))),
        (request) =>
            this.myAngularHttpService
                .get(`foo/bar/${request}`)
                .pipe(catchError(e => of(new HttpError(e))))
    ),
    switchMap(res => new DeleteSuccess())

Solution

  • You can use forkJoin, it will emit the last emitted value from each of completed observables. The following is an example from the linked documentation:

        import { mergeMap } from 'rxjs/operators';
        import { forkJoin } from 'rxjs/observable/forkJoin';
        import { of } from 'rxjs/observable/of';
        
        const myPromise = val =>
          new Promise(resolve =>
            setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
          );
        
        const source = of([1, 2, 3, 4, 5]);
        //emit array of all 5 results
        const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
        /*
          output:
          [
           "Promise Resolved: 1",
           "Promise Resolved: 2",
           "Promise Resolved: 3",
           "Promise Resolved: 4",
           "Promise Resolved: 5"
          ]
        */
        const subscribe = example.subscribe(val => console.log(val));
    
    

    There is also this nice recipe by Peter B Smith, also using forkJoin for the same propose that I'll just copy/past its content below:


    Copied from: https://gist.github.com/peterbsmyth/ce94c0a5ddceb99bab24a761731d1f07


    Making chained API Calls using @ngrx/Effects

    Purpose

    This recipe is useful for cooking up chained API calls as a result of a single action.

    Description

    In the below example, a single action called POST_REPO is dispatched and it's intention is to create a new repostiory on GitHub then update the README with new data after it is created. For this to happen there are 4 API calls necessary to the GitHub API:

    1. POST a new repostiry
    2. GET the master branch of the new repository
    3. GET the files on the master branch
    4. PUT the README.md file

    The POST_REPO's payload contains payload.repo with information needed for API call 1. The response from API call 1 is necessary for API call 2. The response from API call 2 is necessary for API call 3. The response from API call 3 and `payload.file, which has information needed to update the README.md file, is neccessary for API call 4.

    Using Observable.ForkJoin makes this possible.

    Example

    import { Injectable } from '@angular/core';
    import { Effect, Actions } from '@ngrx/effects';
    import { Action } from '@ngrx/store';
    import { Observable } from 'rxjs/Observable';
    import { of } from 'rxjs/observable/of';
    import { handleError } from './handleError';
    
    
    import { GithubService } from '../services/github.service';
    import * as githubActions from '../actions/github';
    
    @Injectable()
    export class GitHubEffects {
      @Effect()
      postRepo$: Observable<Action> = this.actions$
        .ofType(githubActions.POST_REPO)
        .map((action: githubActions.PostRepo) => action.payload)
        // return the payload and POST the repo
        .switchMap((payload: any) => Observable.forkJoin([
          Observable.of(payload),
          this.githubService.postRepo(payload.repo)
        ]))
        // return the repo and the master branch as an array
        .switchMap((data: any) => {
          const [payload, repo] = data;
          return Observable.forkJoin([
            Observable.of(payload),
            Observable.of(repo),
            this.githubService.getMasterBranch(repo.name)
          ]);
        })
        // return the payload, the repo, and get the sha for README
        .switchMap((data: any) => {
          const [payload, repo, branch] = data;
          return Observable.forkJoin([
            Observable.of(payload),
            Observable.of(repo),
            this.githubService.getFiles(repo.name, branch)
              .map((files: any) => files.tree
                .filter(file => file.path === 'README.md')
                .map(file => file.sha)[0]
              )
          ]);
        })
        // update README with data from payload.file
        .switchMap((data: any) => {
          const [payload, repo, sha] = data;
          payload.file.sha = sha;
          return this.githubService.putFile(repo.name, payload.file);
        });
    
      constructor(
        private actions$: Actions,
        private githubService: GithubService,
      ) {}
    }