Search code examples
reactive-programmingsystem.reactiveunirx

Reactive accumulation of data coming in bursts


The problem: there is a stream of numeric values. Values are pushed in bursts so 100 values can come very close to each other (time-wise) say each 5-10 ms and then it possibly stops for a while, then can burst again.The idea is to show accumulated value (sum) of windows of at most length of 500 ms.

My first attempt was with Buffer(500ms) but this is causing constant pumping of events (every 500 ms) with the sum of 0 (as the accumulated buffer items os 0), it could be fixed with filtering by empty buffers but I would really like to avoid that entirely and only open the buffering after a value is actually pushed after a period of "silence".

Additional restrictions: the implementation is UniRx which does not contain all the Rx operators, notably Window (which I suspect could be useful in that case) so solution is limited to basic operators including Buffer.


Solution

  • Since you just want the sum, using Buffer is overkill. We can run a Scan or Aggregation.

      var burstSum =
        source
            .Scan(0, (acc, current) => acc + current)
            .Throttle(TimeSpan.FromMilliseconds(500))
            .Take(1)
            .Repeat();
    

    This will start a stream which accumulates the sum until the stream has been idle for at least 500ms.

    But if we want to emit at least every time bucket, we'll have to go a different path. We're making two assumptions:

    1. The sum of time-intervals between elements should be equal to the time-interval between the first and last element.
    2. Throttle will release the last value when the stream completes.

      source
          .TimeInterval()
          .Scan((acc, cur) => new TimeInterval<int>(acc.Value + cur.Value, acc.Interval + cur.Interval))
          .TakeWhile(acc => acc.Interval <= TimeSpan.FromMilliseconds(500))
          .Throttle(TimeSpan.FromMilliseconds(500))
          .Select(acc => acc.Value)
          .Take(1)
          .Repeat();