Search code examples
c#reactive-programmingsystem.reactivesliding-windowrx.net

Rx.NET How do I buffer stream data as a moving (sliding) window without delay?


I'm working on connecting to trading data with Rx.NET library and what I want to do is continuously buffer last 100 seconds of data and analyze it every 2 seconds. I'm using the following Buffer method overload:

        tradeStream
            .Buffer(TimeSpan.FromSeconds(100), TimeSpan.FromSeconds(2))
            .Subscribe(data =>
            {
                //...
            });

The problem now is it works as 100 seconds buffer + it waits for 2 seconds. Is there a way to take a sort of "snapshot" of last 100 seconds immediately with certain interval?


Solution

  • Alright, I must confess I was wrong in my conclusions thinking it's a buffer() extension issue.

    The reason why a delay happened after buffer emission was a long running task inside observer. I have found a solution that does what I expect. I just need to collect some data and call a method to process it in async thread. And even if processing takes longer than buffering interval, the sequence is still consistent.

    var tlist = Observable.Range(1, 100)
    .Zip(Observable.Interval(TimeSpan.FromMilliseconds(1000)), (i, t) => i);
    
    var list = tlist.Publish();
    
    
    list.Subscribe(b => {
    Console.WriteLine(b);
    });
    
    list
    .Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
    .SelectMany(async b =>
    {
        Console.WriteLine(string.Join("_", b));
        await Task.Delay(4000);
        return Task.FromResult(0);
    })
    .Subscribe();
    
    list.Connect();
    

    code output