Search code examples
javascriptangularrxjsobservable

Angular subscribe to multiple asynchronous http calls that partly depend on each other and return arrays


I'm fairly new to angular and javascript and building my first angular dashboard app that queries the Azure Devops api to get test results:

  1. Query the API to get a filtered list of release definitions (about 100)
  2. For each release definition get the latest release
  3. For each release get a collection of all the test runs from this release.
  4. Show a table of all results (each release definition is a row with the test results as expandable table) as soon as the first set of test results are received.

I managed to get this working with nested subscriptions on observables (see below), but I understand this should be avoided and is better done with something likt mergeMap / switchMap and/or forkJoin. Been struggling with that for days, but no luck yet.

And then there's challenge #2: a second stream of data should be added to this: pipelines. Following the same recipe: get a list of pipelines, for each the latest pipeline run, for each of that all the test runs. Both data streams can/should be obtained separately and asynchronously and as soon as one of them has fetched the first set of test runs it can be shown on the dashboard.

How to accomplish this??

My working solution for release definitions only using nested subscriptions:

ngOnInit(): void {   
    this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))
    .subscribe(params => {
      this.teamToFilterOn = params.get('team');
      this.apiService.getReleaseDefinitions(this.teamToFilterOn as string)
      .pipe(takeUntil(this.ngUnsubscribe))
      .subscribe((releaseDefinitions: any)=> {
        if (releaseDefinitions.length === 0) {
          this.isLoading = false
        }
        releaseDefinitions.forEach((releaseDefinition: any) => {
          if (releaseDefinition.lastRelease) {
            this.apiService.getRelease(releaseDefinition.lastRelease.id)
            .pipe(takeUntil(this.ngUnsubscribe))
            .subscribe((info: PipelineOrReleaseInfo) => {
              if (info) {
                this.apiService.getTestRunsByRelease(info.releaseId)
                .pipe(takeUntil(this.ngUnsubscribe))
                .subscribe((testruns: any) => {    
                  this.isLoading = false;              
                  this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
                  this.dataSource.data = this.results;
                });
              }              
            });
          }
        });            
      });
    });
  }   

First try using forkJoin but stuck on how to proceed. Also not sure if this is correct, because forkJoin seems to wait until both observables are complete, but instead as soon as one of them has a result it should proceed to loop over the results and do the remaining calls.

ngOnInit(): void {   
    this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))
    .subscribe(params => {
      this.teamToFilterOn = params.get('team');      
      
      let releaseDefQuery =  this.apiService.getReleaseDefinitions(this.teamToFilterOn as string)
      let pipelineDefQuery = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string)

      forkJoin([releaseDefQuery, pipelineDefQuery]).subscribe(definitions => {
        let releaseDefinitions = definitions[0];
        let pipelineDefinitions = definitions[1];

        releaseDefinitions.forEach((releaseDefinition: any) => {
          if (releaseDefinition.lastRelease) {
            this.apiService.getRelease(releaseDefinition.lastRelease.id)
            .pipe(takeUntil(this.ngUnsubscribe))
            .subscribe((info: PipelineOrReleaseInfo) => {
...

EDIT: For clarity also added the pipelines flow, resulting in the same objects info of type PipelineOrReleaseInfo and testruns. As soon as one of the flows (releases or pipelines) has these two objects completed it can be shown. So these two flows can/should be merged at some point?

pipelineDefinitions.forEach((pipelineDefinition: any) => {
    this.apiService.getLatestPipelineRun(pipelineDefinition.id)
    .pipe(takeUntil(this.ngUnsubscribe))
    .subscribe((info: PipelineOrReleaseInfo) => {
      if (info) {
        this.apiService.getTestRunsByPipeline(info.pipelineRunId)
        .pipe(takeUntil(this.ngUnsubscribe))
        .subscribe((testruns: any) => {    
          this.isLoading = false;              
          this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
          this.dataSource.data = this.results;
        });
      }              
    });
}

EDIT: fully working code from accepted answer:

ngOnInit(): void {
    this.teamToFilterOn = this.router.snapshot.paramMap.get('team');    

    const releaseResults$: Observable<any> = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
      mergeMap(releaseDefs => releaseDefs), // Turns Observable<[]> into Observable<>
      filter((releaseDef: any) => releaseDef.lastRelease), // Take only releaseDefs with a lastRelease
      mergeMap((releaseDef: any) => this.apiService.getRelease(releaseDef.lastRelease.id)),
      filter(releaseInfo => !!releaseInfo), // Continue only when release info is returned
      mergeMap((releaseInfo: PipelineOrReleaseInfo) => this.apiService.getTestRunsByRelease(releaseInfo.releaseId)
      .pipe(map(testruns => ({ testruns, info: releaseInfo }))))
    );

    const pipelineResults$: Observable<any> = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string).pipe(
      mergeMap(pipelineDefs => pipelineDefs), // Turns Observable<[]> into Observable<def>
      mergeMap((pipelineDef: any) => this.apiService.getLastPipelineRun(pipelineDef.id)),
      filter(pipelineInfo => !!pipelineInfo), // Continue only when pipeline info is returned
      mergeMap((pipelineInfo: PipelineOrReleaseInfo) => this.apiService.getTestRunsByPipeline(pipelineInfo.pipelineRunId)
      .pipe(map(testruns => ({ testruns, info: pipelineInfo }))))
    );
   
    merge(releaseResults$, pipelineResults$)
    .pipe(takeUntil(this.ngUnsubscribe))
    .subscribe(({ testruns, info }) => {
      this.isLoading = false;              
      this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
      this.dataSource.data = this.results;
    });
  }

Solution

  • Like any kind of Stream (e.g. Promises), when you see nesting in Observables you might want to take a step back to see if it's really warranted.

    Let's examine your solution bit by bit.

    Our starting point is:

    this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))
    

    Then you subscribe, but within that subscribe you do observable operations on the given data, this strongly suggests you should be pipeing an operation instead, and then subscribe on the final result.

    In this case you want to map your params to some Observable. You also might benefit from the "interrupt early" behavior that switchMap offers. Otherwise there's also mergeMap as a potential option if you don't want to "interrupt early" (it used to be more appropriately named flatMap).

    We'll add a filter and map for good measure, to ensure we have the team param, and to pluck it out (since we don't need the rest).

    this.router.paramMap.pipe(
        takeUntil(this.ngUnsubscribe),
        filter(params => params.has("team"))
        map(params => params.get("team"))
        switchMap(team => {
          this.teamToFilterOn = team as string;
          // We'll dissect the rest
        })
    ) // [...]
    

    Then comes the part with what you want to do with that team.

    You have multiple "tasks" that rely on the same input, and you want them both at the same time, so reaching for forkJoin is a good call. But there's also combineLatest that does something similar, but combine the results "step by step" instead.

    You use the word "latest" for both your tasks, so we'll indeed reach for combineLatest instead:

    const releaseDef$ = // [...]
    const pipelineDef$ = // [...]
    return combineLatest([releaseDef$, pipelineDef$]);
    

    Now let's dissect these two operations.

    From what I gather, you're only interested in releases that have a lastRelease. You also don't want to "switch" when a new one comes in, you want them all, let's encode that:

    const releaseDef$ = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
        mergeMap(releaseDefs => releaseDefs), // Turns Observable<def[]> into Observable<def>
        filter(releaseDef => releaseDef.lastRelease),
        mergeMap(lastReleaseDef => this.apiService.getRelease(releaseDefinition.lastRelease.id)),
        filter(info => !!info)
        mergeMap(info => this.apiService.getTestRunsByRelease(info.releaseId).pipe(map(testruns => ({ testruns, info }))),
    )
    

    You'll notice I also pipe into the result of getTestRunsByRelease. That is because unlike Promises, we don't have an alternative syntax like async/await that help with keeping previous state in an easy way. Instead we have to rely on the monoid operation map from within our monad operation flatMap and drag the previous results along. For Promises, both map and flatMap are .then. For Observables they are respectively map and mergeMap.

    We apply a very similar transformation to your pipelines:

    const pipelineDef$ = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string)
      .pipe(
        mergeMap(pipelineDefs => pipelineDefs), // Turns Observable<def[]> into Observable<def>
        mergeMap(pipelineDef => this.apiService.getLatestPipelineRun(pipelineDef.id)),
        filter(info => !!info),
        mergeMap(info => this.apiService.getTestRunsByPipeline(info.pipelineRunId).pipe(map(testruns => ({ testruns, info }))),
      );
    

    Here if you need to operate independently on the results of releaseDef$ and pipelineDef$ you can use tap.

    Note that these could easily be extracted into two methods.

    As the end operation is the same for both, and their results have the same shape, you can use merge instead of combineLatest to merge the two observables into one that emits all values of both as they come in (instead of combining and emitting the latest value of each in an array):

    return merge(releaseDef$, pipelineDef$);
    

    To wrap this up, let's put it all together:

    ngOnInit(): void {
      this.router.paramMap.pipe(
        takeUntil(this.ngUnsubscribe),
        filter(params => params.has("team"))
        map(params => params.get("team"))
        switchMap(team => {
          this.teamToFilterOn = team as string;
          
          const releaseDef$ = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
            mergeMap(releaseDefs => releaseDefs), // Turns Observable<def[]> into Observable<def>
            filter(releaseDef => releaseDef.lastRelease),
            mergeMap(lastReleaseDef => this.apiService.getRelease(releaseDefinition.lastRelease.id)),
            filter(info => !!info)
            mergeMap(info => this.apiService.getTestRunsByRelease(info.releaseId).pipe(map(testruns => ({ testruns, infos })))),
          );
    
          const pipelineDef$ = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string)
            .pipe(
              mergeMap(pipelineDefs => pipelineDefs), // Turns Observable<def[]> into Observable<def>
              mergeMap(pipelineDef => this.apiService.getLatestPipelineRun(pipelineDef.id)),
              filter(info => !!info),
              mergeMap(info => this.apiService.getTestRunsByPipeline(info.pipelineRunId).pipe(map(testruns => ({ testruns, info }))),
            )
    
          return merge(releaseDef$, pipelineDef$);
        })
      ).subscribe(({ testruns, infos }) => {
        this.isLoading = false;              
        this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];             
        this.dataSource.data = this.results;
      });
    }
    

    You'll notice I only used on takeUntil(this.ngUnsubscribe) as the "main" observable chain will stop with that, which means operation will stop as well.

    If you're unsure or encounter issues, you can still sprinkle them as the very first argument of each .pipe.