Search code examples
angular6rxjs6rxjs-pipeable-operators

RxJs Interval with takeUntil to publish last value


I have some code which polls until a task is complete

See below

this.simulationStatus =
  interval(2000).pipe(
    switchMap(
      () => from(this.simulationService.getSimulationStatus(this.route.snapshot.paramMap.get('jobId')))),
    takeUntil(this.stopPoll),
    tap(simulation => {
      if (simulation && simulation.complete) {
        if (this.stopCount == 1) {
          // Get once after complete
          this.stopPoll.next(true);
        }
        this.stopCount++;
      }
    })
  );

I have tried using takeUntil and takeWhile the problem is that that the last value is never published once the task is complete.

To get around this I have to include the tap method to with the stopPoll subject and incrementing the stopCount to get the last value.

So the above works but just feels a bit messy, I'm sure there must be a better way of achieving this?

I would have expected takeUntil to publish the last value or have an override to tell it to e.g takeUntil(observable, {publishLast: true})

BTW Update, the observable is subscribed to by an Angular 6 template Thanks in advance


Solution

  • One thing you can do is use a custom takeWhile-like operator like this:

    const completeWith = <T>(predicate: (arg: T) => boolean) => (
      source: Observable<T>,
    ) =>
      new Observable<T>(observer =>
        source.subscribe(
          value => {
            observer.next(value);
            if (predicate(value)) {
              observer.complete();
            }
          },
          error => observer.error(error),
          () => observer.complete(),
        ),
      );
    

    It doesn't seem like a good idea to see it as a variation of takeWhite because it's not just taking values while a condition holds, but also emits an extra value.

    It might be that a more elegant solution would be make the simulation status observable emit two kinds of values: next notifications and completion notifications, similarly to how materialize/dematerialize operators work.