Search code examples
rxjsreactivexdebouncing

is there a version of debounce that selects the maximum value in the time frame? (RxJS)


Basically just the title. I have merged two streams, but they both emit once every second, and I want to only take the maximum one per second. my merged stream has two values in it per second, but my current debounce method won't pay attention to which one is bigger.

Are there any solutions to this? (i.e. a way to pass a function to debounce)

Here is my code:

var a = [11, 12, 14, 15, 18, 20];
var b = [1, 2, 8, 16, 43, 100];

const first = interval(1000).pipe(
  take(a.length),
  map(i => a[i])
);

const second = interval(1000).pipe(
  take(b.length),
  map(i => b[i])
);

const example = first.pipe(merge(second));

const example2 = example.pipe(
  debounce(() => interval(1))
);

I have tried using scan and max, but they seem to not have my desired functionality.


Solution

  • I'm really uncertain about what it is that you actually want as your description isn't very clear. So, I'll give you two possibilities and we can see if one of them rings true.

    The first is to combine the two streams and emit only the largest value of each. This doesn't require any timing and doesn't resemble debounce. It just pairs the source observables.

    zip(first, second).pipe(
      map(([x, y]) => Math.max(x, y))
    ).subscribe(console.log);
    

    The second approach is to buffer values for a second and then emit the largest of the values buffered. This approach doesn't pair off the source streams. It doesn't care how many streams there are or how often they emit. One second could have 12 values and the next could have 1 value. It will just emit the biggest.

    merge(first, second).pipe(
      bufferTime(1000),
      filter(buffer => buffer.length > 0),
      map(buffer => Math.max(...buffer))
    ).subscribe(console.log);
    

    These two do very different things, but probably have a similar output for your specific case. The First is closer to what your problem describes and the second one is closer to what your title describes.


    As an aside, here's a more functional approach to create your first and second streams.

    const first = zip(
      from([11, 12, 14, 15, 18, 20]),
      interval(1000),
      (x, y) => x
    );
    
    const second = zip(
      from([1, 2, 8, 16, 43, 100]),
      interval(1000),
      (x, y) => x
    ); 
    

    The added value here is that interval doesn't need to be modified by reading a variable from its closure. Here it can remain 'pure' because zip will unsubscribe once the array-stream completes.