Search code examples
reactive-programmingsystem.reactive

Threshold detection with reactive extensions


I have rewritten this question after spending some time on reactive extensions - I probably know enough to now be very dangerous so please be patient and thanks in advance.

I have the following code which detects levels:

static class LevelExtensions
{
    public static IEnumerable<double> GetCrossovers(this double[] self, double x1, double x2)
    {
        return from level in self
               where level >= x1 && level <= x2 || level <= x1 && level >= x2
               select level;
    }

    public static IObservable<ThresholdCrossedEvent> ThresholdDetection(this IObservable<double> source, double[] thresholds)
    {
        return source
                .Buffer(2, 1)
                .Where(x => x.Count == 2)
                .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
                .Where(x => x.LevelsCrossed.ToList().Count > 0)
                .SelectMany(x => from level in x.LevelsCrossed select new ThresholdCrossedEvent(level, x.Previous, x.Current));
    }
}

public class ThresholdCrossedEvent
{
    public ThresholdCrossedEvent(double level, double previous, double current)
    {

        Threshold = level;
        Previous = previous;
        Current = current;
    }

    public double Threshold { get; set; }
    public double Previous { get; set; }
    public double Current { get; set; }
    public Direction SlopeDirection => Current >= Previous ? Direction.Up : Direction.Down;
    public bool IsTouching => Current == Threshold || Previous == Threshold;
}


var feed = new double[] { 1, 2, 3, 5, 5.5, 6, 9, 12, 10, 9, 7.5, 6.5, 7 }.ToObservable();
var levels = new double[] { 5, 7, 8 };

feed
  .ThresholdDetection(levels)
  .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

And it produces the events as expected, OK so far:

{"Threshold":5.0,"Previous":3.0,"Current":5.0,"IsUpward":true,"IsTouching":true}
{"Threshold":5.0,"Previous":5.0,"Current":5.5,"IsUpward":true,"IsTouching":true}
{"Threshold":7.0,"Previous":6.0,"Current":9.0,"IsUpward":true,"IsTouching":false}
{"Threshold":8.0,"Previous":6.0,"Current":9.0,"IsUpward":true,"IsTouching":false}
{"Threshold":8.0,"Previous":9.0,"Current":7.5,"IsUpward":false,"IsTouching":false}
{"Threshold":7.0,"Previous":7.5,"Current":6.5,"IsUpward":false,"IsTouching":false}
{"Threshold":7.0,"Previous":6.5,"Current":7.0,"IsUpward":true,"IsTouching":true}

Any performance smells?

Without prematurely optimizing - Anything else that is screaming out performance issues (as i will be processing 40,000+ messages per second)?

I note my GetLevelsBetweenPair level crossing detection would rely on Linq.Where with thousands of updates per second, and my levels sequence should be ordered low to high - does Linq optimize for me or anything I should consider?

How would I handle the levels list to be updated at runtime?

I am calling OnNext on my feed list posting values from a message bus, then via a user interface/command I add/remove an element on the levels list (and it would need to reorder itself atomically after modification).

Given the constant checking this will be a highly contested array (but would not be updated often at all), what things should be considered for this levels list update process?

Which structure could allow gated access in the most efficient way?

Having different lists of levels and calling the same subscription on it

I would have different "categories" of levels, each creating the same ThresholdCrossedEvent but would mean different things i.e. HistoricalThreshold or a WeeklyThreshold.

Is it OK (preferable) to do the below as multiple subscriptions, or should I encode this threshold type data into the alert and change it from a double[] to an class object that contains the requisite fields?

feed
  .ThresholdDetection(historicalThresholds)
  .Subscribe(x => Console.WriteLine("Historical " + JsonConvert.SerializeObject(x)));

feed
  .ThresholdDetection(weeklyThresholds)
  .Subscribe(x => Console.WriteLine("Weekly: " + JsonConvert.SerializeObject(x)));

Solution

  • Your implementation looks ok. A couple things:

    1. You want to model the changing levels as an observable as well.
    2. I'm not the best Linq2Objects expert, but I think the list/array casting is unnecessary and a mild performance hit (if you're worried about that). Obviously test.
    3. I hate Linq syntax so I removed it in the sample below. I don't think that matters.

    The solution then looks like this:

    static class LevelExtensions
    {
        public static IEnumerable<double> GetCrossovers(this IEnumerable<double> self, double x1, double x2)
        {
            return self
                .Where(level => level >= x1 && level <= x2 || level <= x1 && level >= x2);
        }
    
        public static IObservable<ThresholdCrossedEvent> ThresholdDetection(this IObservable<double> source, IObservable<double[]> thresholdSource)
        {
            return thresholdSource
                .Select(thresholds => source
                    .Buffer(2, 1)
                    .Where(x => x.Count == 2)
                    .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
                    .Where(x => x.LevelsCrossed.Count() > 0)
                    .SelectMany(x => x.LevelsCrossed.Select(lc => new ThresholdCrossedEvent(lc, x.Previous, x.Current)))
                )
                .Switch();
        }
    }
    

    This is a direct adaptation of your code: The only difference is the outer Select, the Switch and the typing. For each levels notification, the new Select creates a new Observable. Switch means always switch to the latest one, and drop the older ones. So this way you're always using the latest levels thresholds.

    I tested it with running the following code:

    var feed = new double[] { 1, 2, 3, 5, 5.5, 6, 9, 12, 10, 9, 7.5, 6.5, 7 };
    var levels = new double[] { 5, 7, 8 };
    
    var feedSubject = new Subject<double>();
    var levelsSubject = new BehaviorSubject<double[]>(levels);
    
    feedSubject
        .ThresholdDetection(levelsSubject)
        .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
        
    foreach (var d in feed)
        feedSubject.OnNext(d);
    
    levelsSubject.OnNext(new double[] { 5, 8 });
    Console.WriteLine("---");
    
    foreach (var d in feed)
        feedSubject.OnNext(d);
    

    It runs through that feed array once with your original thresholds, then I run through it with the 7 threshold removed. My output is like this:

    {"Threshold":5.0,"Previous":3.0,"Current":5.0,"SlopeDirection":0,"IsTouching":true}
    {"Threshold":5.0,"Previous":5.0,"Current":5.5,"SlopeDirection":0,"IsTouching":true}
    {"Threshold":7.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
    {"Threshold":8.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
    {"Threshold":8.0,"Previous":9.0,"Current":7.5,"SlopeDirection":1,"IsTouching":false}
    {"Threshold":7.0,"Previous":7.5,"Current":6.5,"SlopeDirection":1,"IsTouching":false}
    {"Threshold":7.0,"Previous":6.5,"Current":7.0,"SlopeDirection":0,"IsTouching":true}
    ---
    {"Threshold":5.0,"Previous":3.0,"Current":5.0,"SlopeDirection":0,"IsTouching":true}
    {"Threshold":5.0,"Previous":5.0,"Current":5.5,"SlopeDirection":0,"IsTouching":true}
    {"Threshold":8.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
    {"Threshold":8.0,"Previous":9.0,"Current":7.5,"SlopeDirection":1,"IsTouching":false}