I'm using Rx.Net and I have Observable that emits time series points (double, timestamp). Every time new point arrives I want to calculate average value from lets say last 30 seconds. I think I need some kind of overlapping Window/Buffer not based on count but timestamp.
I've found this topic with SlidingWindow implementation, but I cannot figure out how to fit this to my problem.
EDIT:
Thanks to this I learned that I can use Scan operator and buffer my points, so basicly this solves the problem. But maybe there is better way to do this?
Buffer
and Window
look forward, you want something that looks back. Scan
is the best starting point:
public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts)
{
return BackBuffer(source, ts, Scheduler.Default);
}
public static IObservable<List<T>> BackBuffer<T>(this IObservable<T> source, TimeSpan ts, IScheduler scheduler)
{
return source
.Timestamp()
.Scan(new List<Timestamped<T>>(), (list, element) => list
.Where(ti => scheduler.Now - ti.Timestamp <= ts)
.Concat(Enumerable.Repeat(element, 1))
.ToList()
)
.Select(list => list.Select(t => t.Value).ToList());
}
Once you have BackBuffer
, or something like it, then the rest becomes easy:
source
.BackBuffer(TimeSpan.FromMilliseconds(70))
.Select(list => list.Average())
.Subscribe(average => Console.WriteLine(average));