Search code examples
javascriptrxjsthrottling

RxJS throttle behavior; get first value immediately


Example Plunkr: https://plnkr.co/edit/NZwb3ol8CbZFtSc6Q9zm?p=preview

I am aware that there are these 3 throttle methods for RxJS (5.0 beta.4):

auditTime(), throttleTime() and debounceTime()

The behavior I am looking for is the one lodash does by default on throttle:

    1. Give me the first value immediately!
    1. on consecutive values, hold values for the given delay, then emit last occurred value
    1. when throttle delay expired, go back to state (1)

In theory this should look like:

inputObservable
  .do(() => cancelPreviousRequest())
  .throttleTime(500)
  .subscribe((value) => doNextRequest(value))

But

  • throttleTime never gives me the last value, if emitted in the throttle timeout
  • debounceTime doesn't trigger immediately
  • auditTime doesn't trigger immediately

Could I combine any of the RxJS methods to achieve the described behavior?


Solution

  • I took the auditTime operator and changed 2 lines to achieve the desired behavior.

    New plunker: https://plnkr.co/edit/4NkXsOeJOSrLUP9WEtp0?p=preview

    Original:

    Changes:

    from (auditTime):

    protected _next(value: T): void {
      this.value = value;
      this.hasValue = true;
      if (!this.throttled) {
        this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
      }
    }
    
    clearThrottle(): void {
      const { value, hasValue, throttled } = this;
      if (throttled) {
        this.remove(throttled);
        this.throttled = null;
        throttled.unsubscribe();
      }
      if (hasValue) {
        this.value = null;
        this.hasValue = false;
        this.destination.next(value);
      }
    }
    

    to (auditTimeImmediate):

    protected _next(value: T): void {
        this.value = value;
        this.hasValue = true;
        if (!this.throttled) {
            // change 1:
            this.clearThrottle();
        }
    }
    
    clearThrottle(): void {
        const { value, hasValue, throttled } = this;
        if (throttled) {
            this.remove(throttled);
            this.throttled = null;
            throttled.unsubscribe();
        }
        if (hasValue) {
            this.value = null;
            this.hasValue = false;
            this.destination.next(value);
            // change 2:
            this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, this));
        }
    }
    

    So I start the timeout after the value was nexted.

    Usage:

    inputObservable
      .do(() => cancelPreviousRequest())
      .auditTimeImmediate(500)
      .subscribe((value) => doNextRequest(value))