Search code examples
c#system.reactiverx.net

Reactive Throttle returning all items added within the TimeSpan


Given an IObservable<T> is there a way to use Throttle behaviour (reset a timer when an item is added, but have it return a collection of all the items added within that time?

Buffer provides a similar functionality it that it chunks the data up into IList<T> on every time span or count. But I need that time to reset each time an item is added.

I've seen a similar question here, Does reactive extensions support rolling buffers?, but the answers don't seem ideal and it's a little old so I wondered if the release version of Rx-Main now supports this functionality out the box.


Solution

  • As I answered in the other post, yes you can! Using the Throttle and Window methods of Observable:

    public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
    {
        var closes = stream.Throttle(delay);
        return stream.Window(() => closes).SelectMany(window => window.ToList());
    }