Search code examples
reactive-programmingsystem.reactive

Reactive Extensions rate of change detection from normal movement


Given a sequence of numbers that trend overtime, I would like to use Reactive Extensions to give an alert when there is a sudden absolute change spike or drop. i.e 101.2, 102.4, 101.4, 100.9, 95, 93, 85... and then increasing slowly back to 100.

The alert would be triggered on the drop from 100.9 to 95, each would have a timestamp looking for an an alert of the form:

LargeChange TimeStamp Distance Percentage

I believe i need to start with Buffer(60, 1) for a 60 sample moving average (of a minute frequency between samples).

Whilst that would give the average value, I can't assign an arbitrary % to trigger the alert since this could vary from signal to signal - one may have more volatility that the other.

To get volatility I would then take a longer historical time frame Buffer(14, 1) (these would be 14 days of daily averages of the same signal).

I would then calculate the difference between each value in the buffer and the 14 day average, square and add all these deviations, and divide by the number of samples.

My questions are please:

  1. How would I perform the above volatility calculation, or is it better to just do this outside of RX and update the new volatility value once daily external to the observable stream calculation (this may make more sense to avoid me having to run 14 days worth of 1 minute samples through it)?

  2. How would we combine the fast moving average and volatility level (updated once per day) to give alerts? I am seeing Scan and DistinctUntilChanged on posts on SO, but cant work out how to put together.


Solution

  • I would start by breaking this down into steps. (For simplicity I'll assume the original data source is an observable called values.)

    1. Convert values into a moving averages observable (we'll call this averages here).
    2. Combine values and averages into an observable that can watch for "extremes".

    For step 1, you may be able to use the built-in Window method that Slugart mentioned in a comment or the similar Buffer method. A Select call after the Window or Buffer can be used to process the array into a single average value object. Something like:

    averages = values.Buffer(60, 1)
                     .Select((buffer) => { /* do average and std dev calcuation here */ });
    

    If you need sliding windows, you may have to implement your own operator, but I could easily be unaware of one that does exist. Scan along with a queue seem like a good basis for such an operator if you need to write it.

    For step 2, you will probably want to start with CombineLatest followed by a Where clause. Something like:

    extremes = values.CombineLatest(averages, (v, a) => new { Current = v, Average = a })
               .Where((value) = { /* check if value.Current is out of deviation from value.Average */ });
    

    The nice part of this approach is that you can choose between having averages be computed directly from values in line like we did here or be some other source of volatility information with minimal effect on the rest of the code.

    Note that the CombineLatest call may cause two subscriptions to values, one directly and one indirectly via a subscription to averages. If the underlying implementation of values makes this undesirable, use Publish and RefCount to get around this.

    Also note that CombineLatest will output a value each time either values or averages outputs a value. This means that you will get two events every time averages updates, one for the values update and one for the averages update triggered by the value.

    If you are using sliding windows, that would mean a double update on every value, and it would probably be better to simply include the current value on the Scan output and skip the CombineLatest altogether. You would have something like this instead:

    averages = values.Scan((v) => { /* build sliding window and attach current value */ });
    extremes = averages.Where((a) => { /* check if current value is out of deviation for the window */ });
    

    Once you have extremes, you can subscribe to it and trigger your alerts.