Search code examples
c#system.reactiverx.net

Trouble Implementing a Sliding Window in Rx


I created a SlidingWindow operator for reactive extensions because I want to easily monitor things like rolling averages, etc. As a simple example, I want to subscribe to hear mouse events, but each time there's an event I want to receive the last three (rather than waiting for every third event to receive the last three). That's why the Window overloads I found don't seem to give me what I need out of the box.

This is what I came up with. I fear that it might not be the most performant solution, given its frequent List operations:

public static IObservable<List<T>> SlidingWindow<T>(this IObservable<T> seq, int length)
{
    var seed = new List<T>();

    Func<List<T>, T, List<T>> accumulator = (list, arg2) =>
    {
        list.Add(arg2);

        if (list.Count > length)
            list.RemoveRange(0, (list.Count - length));

        return list;
    };

    return seq.Scan(seed, accumulator)
                .Where(list => list.Count == length);
}

It can be called this way:

var rollingSequence = Observable.Range(1, 5).SlidingWindow().ToEnumerable();

However, to my great surprise, instead of receiving the expected results

1,2,3
2,3,4
3,4,5

I receive the results

2,3,4
3,4,5
3,4,5

Any insights would be much appreciated!


Solution

  • Try this instead - I'd have to sit and have a think about it's relative performance, but it's at least likely as good, and way easier to read:

    public static IObservable<IList<T>> SlidingWindow<T>(
           this IObservable<T> src, 
           int windowSize)
    {
        var feed = src.Publish().RefCount();    
        // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
        return Observable.Zip(
           Enumerable.Range(0, windowSize)
               .Select(skip => feed.Skip(skip))
               .ToArray());
    }
    

    Test rig:

    var source = Observable.Range(0, 10);
    var query = source.SlidingWindow(3);
    using(query.Subscribe(Console.WriteLine))
    {               
        Console.ReadLine();
    }
    

    Output:

    ListOf(0,1,2)
    ListOf(1,2,3)
    ListOf(2,3,4)
    ListOf(3,4,5)
    ListOf(4,5,6)
    ...
    

    EDIT: As an aside, I find myself compulsively .Publish().RefCount()ing ever since being burned once by not doing it...I don't think it's strictly required here, tho.

    EDIT for yzorg:

    If you augment the method like so, you'll see the runtime behavior more clearly:

    public static IObservable<IList<T>> SlidingWindow<T>(
        this IObservable<T> src, 
        int windowSize)
    {
        var feed = src.Publish().RefCount();    
        // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list  
        return Observable.Zip(
        Enumerable.Range(0, windowSize)
            .Select(skip => 
            {
                Console.WriteLine("Skipping {0} els", skip);
                return feed.Skip(skip);
            })
            .ToArray());
    }