I'm fairly new to angular and javascript and building my first angular dashboard app that queries the Azure Devops api to get test results:
I managed to get this working with nested subscriptions on observables (see below), but I understand this should be avoided and is better done with something likt mergeMap
/ switchMap
and/or forkJoin
. Been struggling with that for days, but no luck yet.
And then there's challenge #2: a second stream of data should be added to this: pipelines. Following the same recipe: get a list of pipelines, for each the latest pipeline run, for each of that all the test runs. Both data streams can/should be obtained separately and asynchronously and as soon as one of them has fetched the first set of test runs it can be shown on the dashboard.
How to accomplish this??
My working solution for release definitions only using nested subscriptions:
ngOnInit(): void {
this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))
.subscribe(params => {
this.teamToFilterOn = params.get('team');
this.apiService.getReleaseDefinitions(this.teamToFilterOn as string)
.pipe(takeUntil(this.ngUnsubscribe))
.subscribe((releaseDefinitions: any)=> {
if (releaseDefinitions.length === 0) {
this.isLoading = false
}
releaseDefinitions.forEach((releaseDefinition: any) => {
if (releaseDefinition.lastRelease) {
this.apiService.getRelease(releaseDefinition.lastRelease.id)
.pipe(takeUntil(this.ngUnsubscribe))
.subscribe((info: PipelineOrReleaseInfo) => {
if (info) {
this.apiService.getTestRunsByRelease(info.releaseId)
.pipe(takeUntil(this.ngUnsubscribe))
.subscribe((testruns: any) => {
this.isLoading = false;
this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];
this.dataSource.data = this.results;
});
}
});
}
});
});
});
}
First try using forkJoin
but stuck on how to proceed. Also not sure if this is correct, because forkJoin
seems to wait until both observables are complete, but instead as soon as one of them has a result it should proceed to loop over the results and do the remaining calls.
ngOnInit(): void {
this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))
.subscribe(params => {
this.teamToFilterOn = params.get('team');
let releaseDefQuery = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string)
let pipelineDefQuery = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string)
forkJoin([releaseDefQuery, pipelineDefQuery]).subscribe(definitions => {
let releaseDefinitions = definitions[0];
let pipelineDefinitions = definitions[1];
releaseDefinitions.forEach((releaseDefinition: any) => {
if (releaseDefinition.lastRelease) {
this.apiService.getRelease(releaseDefinition.lastRelease.id)
.pipe(takeUntil(this.ngUnsubscribe))
.subscribe((info: PipelineOrReleaseInfo) => {
...
EDIT: For clarity also added the pipelines flow, resulting in the same objects info
of type PipelineOrReleaseInfo
and testruns
. As soon as one of the flows (releases or pipelines) has these two objects completed it can be shown. So these two flows can/should be merged at some point?
pipelineDefinitions.forEach((pipelineDefinition: any) => {
this.apiService.getLatestPipelineRun(pipelineDefinition.id)
.pipe(takeUntil(this.ngUnsubscribe))
.subscribe((info: PipelineOrReleaseInfo) => {
if (info) {
this.apiService.getTestRunsByPipeline(info.pipelineRunId)
.pipe(takeUntil(this.ngUnsubscribe))
.subscribe((testruns: any) => {
this.isLoading = false;
this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];
this.dataSource.data = this.results;
});
}
});
}
EDIT: fully working code from accepted answer:
ngOnInit(): void {
this.teamToFilterOn = this.router.snapshot.paramMap.get('team');
const releaseResults$: Observable<any> = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
mergeMap(releaseDefs => releaseDefs), // Turns Observable<[]> into Observable<>
filter((releaseDef: any) => releaseDef.lastRelease), // Take only releaseDefs with a lastRelease
mergeMap((releaseDef: any) => this.apiService.getRelease(releaseDef.lastRelease.id)),
filter(releaseInfo => !!releaseInfo), // Continue only when release info is returned
mergeMap((releaseInfo: PipelineOrReleaseInfo) => this.apiService.getTestRunsByRelease(releaseInfo.releaseId)
.pipe(map(testruns => ({ testruns, info: releaseInfo }))))
);
const pipelineResults$: Observable<any> = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string).pipe(
mergeMap(pipelineDefs => pipelineDefs), // Turns Observable<[]> into Observable<def>
mergeMap((pipelineDef: any) => this.apiService.getLastPipelineRun(pipelineDef.id)),
filter(pipelineInfo => !!pipelineInfo), // Continue only when pipeline info is returned
mergeMap((pipelineInfo: PipelineOrReleaseInfo) => this.apiService.getTestRunsByPipeline(pipelineInfo.pipelineRunId)
.pipe(map(testruns => ({ testruns, info: pipelineInfo }))))
);
merge(releaseResults$, pipelineResults$)
.pipe(takeUntil(this.ngUnsubscribe))
.subscribe(({ testruns, info }) => {
this.isLoading = false;
this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];
this.dataSource.data = this.results;
});
}
Like any kind of Stream
(e.g. Promise
s), when you see nesting in Observable
s you might want to take a step back to see if it's really warranted.
Let's examine your solution bit by bit.
Our starting point is:
this.router.paramMap.pipe(takeUntil(this.ngUnsubscribe))
Then you subscribe, but within that subscribe you do observable operations on the given data, this strongly suggests you should be pipe
ing an operation instead, and then subscribe on the final result.
In this case you want to map
your params
to some Observable
. You also might benefit from the "interrupt early" behavior that switchMap
offers. Otherwise there's also mergeMap
as a potential option if you don't want to "interrupt early" (it used to be more appropriately named flatMap
).
We'll add a filter
and map
for good measure, to ensure we have the team
param, and to pluck it out (since we don't need the rest).
this.router.paramMap.pipe(
takeUntil(this.ngUnsubscribe),
filter(params => params.has("team"))
map(params => params.get("team"))
switchMap(team => {
this.teamToFilterOn = team as string;
// We'll dissect the rest
})
) // [...]
Then comes the part with what you want to do with that team.
You have multiple "tasks" that rely on the same input, and you want them both at the same time, so reaching for forkJoin
is a good call. But there's also combineLatest
that does something similar, but combine the results "step by step" instead.
You use the word "latest" for both your tasks, so we'll indeed reach for combineLatest
instead:
const releaseDef$ = // [...]
const pipelineDef$ = // [...]
return combineLatest([releaseDef$, pipelineDef$]);
Now let's dissect these two operations.
From what I gather, you're only interested in releases that have a lastRelease
. You also don't want to "switch" when a new one comes in, you want them all, let's encode that:
const releaseDef$ = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
mergeMap(releaseDefs => releaseDefs), // Turns Observable<def[]> into Observable<def>
filter(releaseDef => releaseDef.lastRelease),
mergeMap(lastReleaseDef => this.apiService.getRelease(releaseDefinition.lastRelease.id)),
filter(info => !!info)
mergeMap(info => this.apiService.getTestRunsByRelease(info.releaseId).pipe(map(testruns => ({ testruns, info }))),
)
You'll notice I also pipe into the result of getTestRunsByRelease
. That is because unlike Promise
s, we don't have an alternative syntax like async/await
that help with keeping previous state in an easy way. Instead we have to rely on the monoid operation map
from within our monad operation flatMap
and drag the previous results along. For Promise
s, both map
and flatMap
are .then
. For Observable
s they are respectively map
and mergeMap
.
We apply a very similar transformation to your pipelines:
const pipelineDef$ = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string)
.pipe(
mergeMap(pipelineDefs => pipelineDefs), // Turns Observable<def[]> into Observable<def>
mergeMap(pipelineDef => this.apiService.getLatestPipelineRun(pipelineDef.id)),
filter(info => !!info),
mergeMap(info => this.apiService.getTestRunsByPipeline(info.pipelineRunId).pipe(map(testruns => ({ testruns, info }))),
);
Here if you need to operate independently on the results of releaseDef$
and pipelineDef$
you can use tap
.
Note that these could easily be extracted into two methods.
As the end operation is the same for both, and their results have the same shape, you can use merge
instead of combineLatest
to merge the two observables into one that emits all values of both as they come in (instead of combining and emitting the latest value of each in an array):
return merge(releaseDef$, pipelineDef$);
To wrap this up, let's put it all together:
ngOnInit(): void {
this.router.paramMap.pipe(
takeUntil(this.ngUnsubscribe),
filter(params => params.has("team"))
map(params => params.get("team"))
switchMap(team => {
this.teamToFilterOn = team as string;
const releaseDef$ = this.apiService.getReleaseDefinitions(this.teamToFilterOn as string).pipe(
mergeMap(releaseDefs => releaseDefs), // Turns Observable<def[]> into Observable<def>
filter(releaseDef => releaseDef.lastRelease),
mergeMap(lastReleaseDef => this.apiService.getRelease(releaseDefinition.lastRelease.id)),
filter(info => !!info)
mergeMap(info => this.apiService.getTestRunsByRelease(info.releaseId).pipe(map(testruns => ({ testruns, infos })))),
);
const pipelineDef$ = this.apiService.getPipelineDefinitions(this.teamToFilterOn as string)
.pipe(
mergeMap(pipelineDefs => pipelineDefs), // Turns Observable<def[]> into Observable<def>
mergeMap(pipelineDef => this.apiService.getLatestPipelineRun(pipelineDef.id)),
filter(info => !!info),
mergeMap(info => this.apiService.getTestRunsByPipeline(info.pipelineRunId).pipe(map(testruns => ({ testruns, info }))),
)
return merge(releaseDef$, pipelineDef$);
})
).subscribe(({ testruns, infos }) => {
this.isLoading = false;
this.results = [...this.results, { info: info, testruns: testruns, totals: this.calculateEnvironmentTotals(testruns.testRunResults)}];
this.dataSource.data = this.results;
});
}
You'll notice I only used on takeUntil(this.ngUnsubscribe)
as the "main" observable chain will stop with that, which means operation will stop as well.
If you're unsure or encounter issues, you can still sprinkle them as the very first argument of each .pipe
.