Search code examples
c#system.reactive

reactive extensions sliding time window


I have a sequence of stock ticks coming in and I want to take all the data in the last hour and do some processing on it. I am trying to achieve this with reactive extensions 2.0. I read on another post to use Interval but i think that is deprecated.


Solution

  • Would this extension method solve your problem?

    public static IObservable<T[]> RollingBuffer<T>(
        this IObservable<T> @this,
        TimeSpan buffering)
    {
        return Observable.Create<T[]>(o =>
        {
            var list = new LinkedList<Timestamped<T>>();
            return @this.Timestamp().Subscribe(tx =>
            {
                list.AddLast(tx);
                while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
                {
                    list.RemoveFirst();
                }
                o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
            }, ex => o.OnError(ex), () => o.OnCompleted());
        });
    }