Search code examples
rxjsobservablerxjs-pipeable-operators

DebounceTime after first value in RxJS


I need a specific behavior that I can't get with the RxJS operators. The closest would be to use DebounceTime only for values entered after the first one, but I can't find a way to do it. I have also tried with ThrottleTime but it is not exactly what I am looking for, since it launches intermediate calls, and I only want one at the beginning that is instantaneous, and another at the end, nothing else.

ThrottleTime

throttleTime(12 ticks, { leading: true, trailing: true })

source:             --0--1-----2--3----4--5-6---7------------8-------9---------
throttle interval:  --[~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~I~~~~~~~~~~~]--------
output:             --0-----------3-----------6-----------7-----------9--------


source_2:           --0--------1------------------2--------------3---4---------
throttle interval:  --[~~~~~~~~~~~I~~~~~~~~~~~]---[~~~~~~~~~~~]--[~~~~~~~~~~~I~
output_2:           --0-----------1---------------2--------------3-----------4-

DebounceTime

debounceTime(500)

source:             --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval:  -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output:             -----------1--------3--------------------------------13---------

What I want

debounceTimeAfterFirst(500) (?)

source:             --0--1--------3------------4-5-6-7-8-9-10-11--13----------------
debounce_interval:  -----[~~~~~]--[~~~~~]--------------------------[~~~~~]----------
output:             --0--------1--3------------4-------------------------13---------

As you see, the debounce time is activated when a new value is entered. If the debounce time passes and any new value has been entered, it stops the listening the debounceTime action and waits to start a new one.

Edit: I forgot to comment that this must be integrated with NgRx’s Effects, so it must be a continuous stream that mustn't be completed. Terminating it would probably cause it to stop listening for dispatched actions.


Solution

  • I would use a throttle combined with a debounceTime:

    • throttle: from Documentation Emit value on the leading edge of an interval, but suppress new values until durationSelector has completed.

    • debounceTime: from Documentation Discard emitted values that take less than the specified time between output.

    I would use a throttle stream to get the raising edge (the first emission) and then the debounce stream would give us the falling edge.

    const source = fromEvent(document.getElementsByTagName('input'), 'keyup').pipe(
      pluck('target', 'value')
    );
    
    const debounced = source.pipe(
      debounceTime(4000),
      map((v) => `[d] ${v}`)
    );
    
    const effect = merge(
      source.pipe(
        throttle((val) => debounced),
        map((v) => `[t] ${v}`)
      ),
      debounced
    );
    
    effect.subscribe(console.log);
    

    See RxJS StackBlitz with the console open to see the values changing.

    I prepared the setup to adapt it to NgRx which you mention. The effect I got working is:

    @Injectable({ providedIn: 'root' })
    export class FooEffects {
      switchLight$ = createEffect(() => {
        const source = this.actions$.pipe(
          ofType('[App] Switch Light'),
          pluck('onOrOff'),
          share()
        );
        const debounced = source.pipe(debounceTime(1000), share());
        return merge(source.pipe(throttle((val) => debounced)), debounced).pipe(
          map((onOrOff) => SetLightStatus({ onOrOff }))
        );
      });
    
      constructor(private actions$: Actions) {}
    }
    
    

    See NgRx StackBlitz with the proposed solution working in the context of an Angular NgRx application.

    • share: This operator prevents the downstream paths to simultaneously fetch the data from all the way up the chain, instead they grab it from the point where you place share.

    I also tried to adapt @martin's connect() approach. But I don't know how @martin would "reset" the system so that after a long time if a new source value is emitted would not debounce it just in the same manner as you first run it, @martin, feel free to fork it and tweak it to make it work, I'm curious about your approach, which is very smart. I didn't know about connect().

    @avicarpio give it a go on your application and let us know how it goes :)