Given an IObservable<T>
is there a way to use Throttle
behaviour (reset a timer when an item is added, but have it return a collection of all the items added within that time?
Buffer
provides a similar functionality it that it chunks the data up into IList<T>
on every time span or count. But I need that time to reset each time an item is added.
I've seen a similar question here, Does reactive extensions support rolling buffers?, but the answers don't seem ideal and it's a little old so I wondered if the release version of Rx-Main now supports this functionality out the box.
As I answered in the other post, yes you can! Using the Throttle
and Window
methods of Observable
:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
}