Search code examples
typescriptrxjsrxjs6

RXJS bufferedAmount / accumulate value / reduce with reset


I have a stream of values

[ 0.5, 0.3, 0.4, 0.6, 1.4, 0.3, 0.6 ]

and I want to turn this into

[           1         2         1 ]

so, we're accumulating the values of the first stream till a whole number is reached (at least 1) and then emit the whole number while accumulating the remaining amount.

It completely boggles my mind and think a solution lies around the corner with switchMap.


Solution

  • Thank you Andrei,

    I've used your code and changed it to a custom operator.

    
    export const bufferAmount = (
      amount: number
    ): MonoTypeOperatorFunction<number> => (
      source: Observable<number>
    ): Observable<number> =>
      new Observable<number>((observer) => {
        let bufferSum = 0;
        return source.subscribe({
          next(value) {
            bufferSum += value;
            if (bufferSum < amount) return;
            const nextSum = Math.trunc(bufferSum);
            bufferSum -= nextSum;
            observer.next(nextSum);
          },
          error(error) {
            observer.error(error);
          },
          complete() {
            observer.complete();
          },
        });
      });
    
    ticker.pipe(bufferAmount(1)).subscribe();