I'm working on connecting to trading data with Rx.NET library and what I want to do is continuously buffer last 100 seconds of data and analyze it every 2 seconds. I'm using the following Buffer method overload:
tradeStream
.Buffer(TimeSpan.FromSeconds(100), TimeSpan.FromSeconds(2))
.Subscribe(data =>
{
//...
});
The problem now is it works as 100 seconds buffer + it waits for 2 seconds. Is there a way to take a sort of "snapshot" of last 100 seconds immediately with certain interval?
Alright, I must confess I was wrong in my conclusions thinking it's a buffer() extension issue.
The reason why a delay happened after buffer emission was a long running task inside observer. I have found a solution that does what I expect. I just need to collect some data and call a method to process it in async thread. And even if processing takes longer than buffering interval, the sequence is still consistent.
var tlist = Observable.Range(1, 100)
.Zip(Observable.Interval(TimeSpan.FromMilliseconds(1000)), (i, t) => i);
var list = tlist.Publish();
list.Subscribe(b => {
Console.WriteLine(b);
});
list
.Buffer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
.SelectMany(async b =>
{
Console.WriteLine(string.Join("_", b));
await Task.Delay(4000);
return Task.FromResult(0);
})
.Subscribe();
list.Connect();