I am working on Angular 6+ web-app using rxjs 6+ .
There is a set of services(all are different and make different http calls) . Basically these services are used to initialize the app.
My requirement is to call these services in parallel and wait untill all of them are finished . Once finished , I need to process the responses received . I have studied a lot on this and found that forkJoin and combineLatest can be my friend. And i have implemented the same in the following way :
getInitialData() {
this._authService.openCorsConnection().subscribe(res => {
forkJoin(
this._authService.x(),
this._tService.getSystemDate(),
this._tService.getTemp(),
this._tService.getSettings()
).pipe(
catchError( err => {
console.log('gor error',err);
if (err.status == 401) {
// this._router.navigateByUrl('/unauthorize');
return EMPTY;
} else {
return throwError(err);
}
}),
map(([x,y,z,p])=>{
console.log('sucesss',{x,y,z,p});
return {x,y,z,p}
}),
finalize(()=>{
console.log('Executing Finally');
processData();
})
);
});
}
But These are not executing . Here are the services :
x(): Observable<any> {
const xUrl = EnvironmentConfiguration.endPointConfig.xServiceEndpoint;
let serviceMethod: String = 'x';
let requestObj: any = this._utilHelperSvc.getRequestObject(xUrl , { "y": "" }, serviceMethod);
return this._http.post<any>(requestObj.url,{ "pid": "" } , requestObj)
.pipe(
catchError(this.handleError('x', {}))
);
}
Do i need to modify services ? I am not able to figure out how to solve this.
Can anybody please Suggest some better solution or different approach for this ?
Thanks.
TL;DR
Demos :
Explanation
You don't need to wait for forkJoin
to process the responses, it works the other way around. Instead, prepare all the jobs that need to be done, and simply wait for completion.
Here is what I mean:
let process1$ = timer(1000);
let process2$ = timer(2000);
let process3$ = timer(3000);
const doSthg1 = pipe(
tap(() => console.log('Process 1 has finished')),
map(() => 'Process 1 has finished')
)
const doSthg2 = pipe(
tap(() => console.log('Process 2 has finished')),
map(() => 'Process 2 has finished')
)
const doSthg3 = pipe(
tap(() => console.log('Process 3 has finished')),
map(() => 'Process 3 has finished')
)
forkJoin(doSthg1(process1$), doSthg2(process2$), doSthg3(process3$)).subscribe(() => {
console.log('Now I am complete');
});
This works as long as your processes are not chained, i.e the input of one does not depend on the output of another.
why isn't my code working ?
Because you don't actually subscribe to forkJoin
.
Solution : for example, you can use concatMap
to transform this._authService.openCorsConnection()
into another Observable :
getInitialData(){
this._authService.openCorsConnection().pipe(
concatMap(() => forkJoin(this._authService.x(),
this._tService.getSystemDate(),
this._tService.getTemp(),
this._tService.getSettings()
)),
catchError(err => {
console.log('gor error', err);
if (err.status == 401) {
return EMPTY;
} else {
return throwError(err);
}
}),
).subscribe(() => processData());
}
zip
You could use zip
instead of forkJoin
if:
implementation:
zip(doSthg1(process1$), doSthg2(process2$), doSthg3(process3$))
.subscribe(
([x,y,z]) => {
console.log(`x=${x}, y=${y}, z=${z}`);
});