Search code examples
c#system.reactivereactivex

Implementing moving average with ReactiveX based on timestamps


I'm using Rx.Net and I have Observable that emits time series points (double, timestamp). Every time new point arrives I want to calculate average value from lets say last 30 seconds. I think I need some kind of overlapping Window/Buffer not based on count but timestamp.

I've found this topic with SlidingWindow implementation, but I cannot figure out how to fit this to my problem.

EDIT:

Thanks to this I learned that I can use Scan operator and buffer my points, so basicly this solves the problem. But maybe there is better way to do this?


Solution

  • Buffer and Window look forward, you want something that looks back. Scan is the best starting point:

    public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts)
    {
        return BackBuffer(source, ts, Scheduler.Default);
    }
    public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
    {
        return source
            .Timestamp()
            .Scan(new List<Timestamped<T>>(), (list, element) => list
                .Where(ti => scheduler.Now - ti.Timestamp <= ts)
                .Concat(Enumerable.Repeat(element, 1))
                .ToList()
            )
            .Select(list => list.Select(t => t.Value).ToList());
    }
    

    Once you have BackBuffer, or something like it, then the rest becomes easy:

    source
        .BackBuffer(TimeSpan.FromMilliseconds(70))
        .Select(list => list.Average())
        .Subscribe(average => Console.WriteLine(average));