Search code examples
rxjsnestedobservablesubscription

Rxjs observable with Inner subscription taking long time and outer observable executed firstly in subsequent requests


I have a button that starts a Observable, then I subscribe to the result and nested I have a subscription that depends on the previous Observable result.

this.getData(params).subscribe(result => {
    // Make some first level process
    console.log('[Outter Execution]: ',result);

    this.getInnerData(result).subscribe(res => {
        // Make some inner level process
        console.log('[Inner Execution]: ',res);
    });
});

Depends on the click speed of the user, the sequence is not the same:

//User click slowly

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]

//User start clicking quickly

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]

[Outer Execution]
[Outer Execution]
[Outer Execution]

[Inner Execution]
[Inner Execution]
[Inner Execution]

[Outer Execution]
[Inner Execution]
[Outer Execution]
[Inner Execution]

As you can see, if the nested subscription take long time and the user click again before the inner subscription is resolved, the first [Outer Execution] message is logged out before the inner execution was solved. Some time later, the previous long time inner subscriptions are solved and the logged messages are returned.

I tried to use switchMap and mergeMap operators without success.

[Edited]: The functionality that I need, is the execution as a block, and the next outer execution needs to be executed after the first execution complete (outer and inner subscription).


Solution

  • To ensure that the processing waits, you need to use concatMap on the main Observable, not on the dependent Observable. Otherwise it is not waiting for the set.

    I forked before your latest changes, but this is what I came up with that seems to work:

      private clickSubject = new Subject<number>();
      clickAction$ = this.clickSubject.asObservable();
    
      ngOnInit() {
        this.clickAction$
          .pipe(
            concatMap(value => this.mainProcess(value)
              .pipe(
                mergeMap(x => this.dependantProcess(x))
              )
            )
          )
          .subscribe();
      }
    
      onClick(value) {
        // Emits a value into the action stream
        this.clickSubject.next(value);
      }
    
      mainProcess(value) {
        console.log("[Emitted] Main", value);
        return of(value).pipe(delay(10));
      }
    
      dependantProcess(value) {
        console.log("[Emitted] Dependent", value);
        return of(value).pipe(delay(2000));
      }
    

    Notice that the concatMap is used to wait for the mainProcess.

    • This code reacts to user clicks by defining an action stream using an RxJS Subject.

    • Each time the user clicks the button, the action stream emits.

    • The action stream pipeline uses concatMap to cache the request and wait to process it until the prior request is processed.

    • When the main process emits, the dependent process executes.

    • When the pipeline is complete (main and its dependent process), then the next cached request is processed by the concatMap.

    Make sense?

    You can find the updated code here:

    https://stackblitz.com/edit/angular-dependent-order-deborahk