Search code examples
angulartypescriptrxjsrxjs-pipeable-operators

RxJs expand and delay operator not working as expected


I'm trying to use RxJs expand operator together with delay in order to reproduce a polling behavior to the server until the HTTP call returns success.

I use the delay operator as well because the HTTP call returns the following object:

{
    retryAfterInSeconds: 30,
    status: 'Running'
}

and I want to skip unnecessary calls if I know when I should check the status again.

So, I implemented the behavior as follows:

private pollExportStatus(entity: any): void {
    let waitingTime = 0;

    this.service
        .getStatus(entity)
        .pipe(
            untilDestroyed(this),
            expand((x: Status) => {
              waitingTime += x.retryAfterInSeconds;

              const shouldContinuePolling =
                waitingTime < 300 &&
                x.status !== StatusType.Succeeded &&
                x.status !== StatusType.Failed;

              if (!shouldContinuePolling) {
                return EMPTY;
              }

              return this.service
                 .getStatus(entity)
                 .pipe(delay(x.retryAfterInSeconds * 1000));
        }),
        last()
        )
        .subscribe((x: Status) => {
            if (waitingTime >= 300 || x.status !== StatusType.Succeeded
            ) {
              console.log('Failed');

              return;
            }

            console.log('Succeeded');
        });
}

Everything works as expected except one thing: even if I return the next call (inside expand) with a delay:

return this.service
  .getStatus(entity)
  .pipe(delay(x.retryAfterInSeconds * 1000));

the first call is not delayed and I don't understand why.

Finally, I found a solution that works as expected.

I replaced the expand return with the following one:

return timer(x.retryAfterInSeconds * 1000).pipe(
     concatMap(() => this.service.getStatus(entity))
);

but I want to understand why the first one is not working and the second one does.

Note: the first behavior is working if I use a simple observable (created with of operator), but not with an HTTP call


Solution

  • Tobias provided a way of thinking from source emissions passing down through operators. 90% of the time it’s how I think too but it’s important to know what actually happens in terms of subscriptions and the creation of observables which go bottom up.

    Let’s follow the stream bottom up.

    You subscribe. But what are you subscribing to? The last piped operator last(). Internally this subscribes to a source observable and when that completes emits the last value from this source.

    So what does last() subscribe to? I.e. what’s its source observable? The observable created by expand(…).

    Expand does a complicated dance but it’s essentially a recursive merge map where it emits the source emission plus any emissions from the observable resulting from the function provided applied to each emission. The key thing is that its source observable* whose first emission triggers the recursion is this.service.getStatus(entity), and this has no delay. The second emission will be from this.service.getStatus(entity).pipe(delay(30*1000)). All delay does is delay emissions. The delay pipe subscribes internally to this.service.getStatus(entity) immediately. Thus you’ve just hit the server twice in a row.

    * unaffected by untilDestroyed(this) which just subscribes to source and completes when this destroyed.

    One point here is that untilDestroyed(this) should be last (EDIT: actually 2nd to last, since you want last() to run on completion). If this is at the top and there are delays in the middle then the stream may continue doing stuff after this is destroyed.

    If you replace the first http with

    of({
        retryAfterInSeconds: 30,
        status: 'Running'
    })
    

    then the first http request will be the one inside the expand with a delay.