Search code examples
angularrxjsobservableangular2-observablesrxjs7

RxJs observable to listen for 10 seconds, return only 5 of the received values, discard the rest, and continue listening?


I have an observable that will be taking in multiple real-time trade values (possibly many per second) from a SignalR hub. What I am trying to achieve is an observable that continuously (every 10 seconds) outputs a sample of 5 trades that occurred in those last 10 seconds.

I wrote an observable pipe to try to achieve this by adding all of the received trades into a buffer for 10 seconds, then creating an observable for each of the trades in the buffer array, using 'concatMap' and 'from'. Then, creating another buffer that collects 5 values, and emits them.

this.bufferedTradeObservable$ = this.tradeReceived
      .pipe(
        tap(v => console.log('pipe-start: ', v)),
        distinct((e: Trade) => e.tradeId),
        bufferTime(10000),
        concatMap((tradeArray) => {
            return from(tradeArray);
        }),
        bufferCount(5),
        tap(v => console.log('pipe-end: ', v))
      );

However, the pipe keeps emitting all of the values that it receives in the 10 second window, but in groups of 5. I tried adding a take(5)in the pipe after the concat map, and it works correctly for the first batch of 5 values, but then the observable "completes" and stops listening for new values. I also tried adding a filter with index after the concatMap like this:

filter((v, i) => (i < 6 )),

This works for the first batch of 5 values, but then keeps filtering out every value, so a second buffer of 5 never gets created. Also this use case of the filter appears to be deprecated.

I'm not sure if I'm overlooking something obvious here, but I've looked at many of the rxjs operators and can't find a way to achieve this


Solution

  • Sounds like all you need is bufferTime. You can decide what to keep and what to throw away afterward.

    this.bufferedTradeObservable$ = this.tradeReceived.pipe(
      // Buffer for 1 seconds
      bufferTime(10000),
      // Only emit the last 5 values from the buffer.
      map(buffer => buffer.slice(-5))
    );