Search code examples
javascriptangularrxjsngrxrxjs-pipeable-operators

How can I determine the number of values has been emitted during the debounce time?


Given:

An NgRx effect that handles a request to "zoom in" a display. It gets a notification every time users click on an appropriate button.

public readonly zoomIn$ = createEffect(
    () =>
      this.actions$.pipe(
        ofType(zoomIn),
        tap(() => {
          scale();
        }),
      ),
    { dispatch: false },
  );

Note: The zoomIn action couldn't and doesn't contain any payload. Consider it only as a trigger

Problem:

The redrawing costs resources and in some cases occupy a few seconds to get a new scale. So if you want to scale up several times in a row you'll be compelled to wait.

Solution:

By using the debounceTime operator postpone the call of the scale() function and wait until users make several clicks. Sounds good. The only problem is debounceTime notifying us of the latest value. And what we need is a number of values (user's clicks) silenced by the debounceTime operator.

In a more general view, the task sounds like: how to calculate the count of values emitted by the source stream and silenced by the debounceTime operator?


Solution

  • My solution is to create a custom pipable operator that achieves the aim.

    What do we need? Of course, we need a debounceTime operator.

    .pipe(
      debounceTime(300),
    )
    

    Then we should calculate the number of values has been emitted. There is a scan operator that pretty much looks like a well-known reduce function. We'll give it an initial value and will increase a counter on every received value from the source stream. We have to place it before the debounceTime operator. It looks now like a stream of indices.

    .pipe(
      scan(acc => acc + 1, 0),
      debounceTime(300),
    )
    

    When debounceTime notifies us of the latest index, how can we know the number of muted values? We have to compare it with the previous index that has been emitted. The previous value can be received by using a pairwise operator. And then get a difference between them using the map operator.

    .pipe(
      scan(acc => acc + 1, 0),
      debounceTime(300),
      pairwise(),
      map(([previous, current]) => current - previous),
    )
    

    If you try this in the current state you notice that something is wrong, that it doesn't work for the first time. The problem lies in the pairwise operator. It emits pairs of values (previous and current), so it waits until it has at least two values before starting the emission of pairs. Is it fair? Yes, it is? That's why we need to cheat it a little and provide a first value (that is 0), with the use of the startWith operator.

    The final implementation

    /**
     * Emits a notification from the source Observable only after a particular time span has passed without another source emission,
     * with an exact number of values were emitted during that time.
     *
     * @param dueTime the timeout duration in milliseconds for the window of time required to wait for emission silence before emitting the most recent source value.
     * @returns MonoTypeOperatorFunction
     */
    export const debounceCounter =
      (dueTime: number): MonoTypeOperatorFunction<number> =>
      (source: Observable<unknown>): Observable<number> =>
        new Observable(observer =>
          source
            .pipe(
              scan(acc => acc + 1, 0),
              debounceTime(dueTime),
              startWith(0),
              pairwise(),
              map(([previous, current]) => current - previous),
            )
            .subscribe({
              next: x => {
                observer.next(x);
              },
              error: err => {
                observer.error(err);
              },
              complete: () => {
                observer.complete();
              },
            }),
        );
    
    

    Usage example

    public readonly zoomIn$ = createEffect(
        () =>
          this.actions$.pipe(
            ofType(zoomIn),
            debounceCounter(300),
            tap(times => {
              // scale n-times
            }),
          ),
        { dispatch: false },
      );