Search code examples
angularrxjsobservableangular6rxjs6

Angular 6: How to make a set of service calls in parallel and process Data once all of them are completed


I am working on Angular 6+ web-app using rxjs 6+ .

There is a set of services(all are different and make different http calls) . Basically these services are used to initialize the app.

My requirement is to call these services in parallel and wait untill all of them are finished . Once finished , I need to process the responses received . I have studied a lot on this and found that forkJoin and combineLatest can be my friend. And i have implemented the same in the following way :

 getInitialData() {

 this._authService.openCorsConnection().subscribe(res => { 
  forkJoin(
    this._authService.x(),
    this._tService.getSystemDate(),
    this._tService.getTemp(),
    this._tService.getSettings()
  ).pipe(
    catchError( err => {
      console.log('gor error',err);
      if (err.status == 401) {
          // this._router.navigateByUrl('/unauthorize');
          return EMPTY;
      } else {
          return throwError(err);
      }
 }),
    map(([x,y,z,p])=>{
           console.log('sucesss',{x,y,z,p});          
            return {x,y,z,p}
    }),
  finalize(()=>{
        console.log('Executing Finally');
        processData();

  })
  );
});
 }

But These are not executing . Here are the services :

x(): Observable<any> {          
const xUrl  = EnvironmentConfiguration.endPointConfig.xServiceEndpoint;
          let serviceMethod: String = 'x';
          let requestObj: any = this._utilHelperSvc.getRequestObject(xUrl  , { "y": "" }, serviceMethod);
          return this._http.post<any>(requestObj.url,{ "pid": "" } , requestObj)
          .pipe(
            catchError(this.handleError('x', {}))
          );
      }

Do i need to modify services ? I am not able to figure out how to solve this.

Can anybody please Suggest some better solution or different approach for this ?

Thanks.


Solution

  • TL;DR

    Use zip or forkjoin.

    Demos :

    Explanation

    You don't need to wait for forkJoin to process the responses, it works the other way around. Instead, prepare all the jobs that need to be done, and simply wait for completion.

    Here is what I mean:

    let process1$ = timer(1000);
    let process2$ = timer(2000);
    let process3$ = timer(3000);
    
    const doSthg1 = pipe(
      tap(() => console.log('Process 1 has finished')),
      map(() => 'Process 1 has finished')
    )
    
    const doSthg2 = pipe(
      tap(() => console.log('Process 2 has finished')),
      map(() => 'Process 2 has finished')
    )
    
    const doSthg3 = pipe(
      tap(() => console.log('Process 3 has finished')),
      map(() => 'Process 3 has finished')
    )
    
    forkJoin(doSthg1(process1$), doSthg2(process2$), doSthg3(process3$)).subscribe(() => {
      console.log('Now I am complete');
    });
    

    Demo with forkJoin

    This works as long as your processes are not chained, i.e the input of one does not depend on the output of another.

    why isn't my code working ?

    Because you don't actually subscribe to forkJoin. Solution : for example, you can use concatMap to transform this._authService.openCorsConnection() into another Observable :

    getInitialData(){
      this._authService.openCorsConnection().pipe(
        concatMap(() => forkJoin(this._authService.x(),
          this._tService.getSystemDate(),
          this._tService.getTemp(),
          this._tService.getSettings()
        )),
        catchError(err => {
          console.log('gor error', err);
          if (err.status == 401) {
            return EMPTY;
          } else {
            return throwError(err);
          }
        }),
      ).subscribe(() => processData());
    }
    

    zip

    You could use zip instead of forkJoin if:

    1. all processes emit only once, then complete : for example http Get or Post requests.
    2. you want to get join outputs from your sources.

    implementation:

    zip(doSthg1(process1$), doSthg2(process2$), doSthg3(process3$))
    .subscribe(
          ([x,y,z]) => {
          console.log(`x=${x}, y=${y}, z=${z}`);
    });
    

    Demo with zip