I have a button that starts a Observable, then I subscribe to the result and nested I have a subscription that depends on the previous Observable result.
this.getData(params).subscribe(result => {
// Make some first level process
console.log('[Outter Execution]: ',result);
this.getInnerData(result).subscribe(res => {
// Make some inner level process
console.log('[Inner Execution]: ',res);
});
});
Depends on the click speed of the user, the sequence is not the same:
//User click slowly
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
//User start clicking quickly
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Outer Execution]
[Outer Execution]
[Inner Execution]
[Inner Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]
As you can see, if the nested subscription take long time and the user click again before the inner subscription is resolved, the first [Outer Execution] message is logged out before the inner execution was solved. Some time later, the previous long time inner subscriptions are solved and the logged messages are returned.
I tried to use switchMap and mergeMap operators without success.
[Edited]: The functionality that I need, is the execution as a block, and the next outer execution needs to be executed after the first execution complete (outer and inner subscription).
To ensure that the processing waits, you need to use concatMap
on the main Observable, not on the dependent Observable. Otherwise it is not waiting for the set.
I forked before your latest changes, but this is what I came up with that seems to work:
private clickSubject = new Subject<number>();
clickAction$ = this.clickSubject.asObservable();
ngOnInit() {
this.clickAction$
.pipe(
concatMap(value => this.mainProcess(value)
.pipe(
mergeMap(x => this.dependantProcess(x))
)
)
)
.subscribe();
}
onClick(value) {
// Emits a value into the action stream
this.clickSubject.next(value);
}
mainProcess(value) {
console.log("[Emitted] Main", value);
return of(value).pipe(delay(10));
}
dependantProcess(value) {
console.log("[Emitted] Dependent", value);
return of(value).pipe(delay(2000));
}
Notice that the concatMap
is used to wait for the mainProcess.
This code reacts to user clicks by defining an action stream using an RxJS Subject
.
Each time the user clicks the button, the action stream emits.
The action stream pipeline uses concatMap
to cache the request and wait to process it until the prior request is processed.
When the main process emits, the dependent process executes.
When the pipeline is complete (main and its dependent process), then the next cached request is processed by the concatMap
.
Make sense?
You can find the updated code here:
https://stackblitz.com/edit/angular-dependent-order-deborahk