My problem is sort of like the one the Nagle algorithm was created to solve, but not exactly. What I'd like is to buffer the OnNext
notifications from an IObservable<T>
into a sequence of IObservable<IList<T>>
s like so:
T
notification arrives, add it to a buffer and start a countdownT
notification arrives before the countdown expires, add it to the buffer and restart the countdownT
notifications as a single aggregate IList<T>
notification.IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler)
looked promising, but it appears to send aggregate notifications out at regular intervals rather than doing the "start the timer when the first notification arrives and restart it when additional ones arrive" behavior I'd like, and it also sends out an empty list at the end of each time window if no notifications have been produced from below.
I do not want to drop any of the T
notifications; just buffer them.
Does anything like this exist, or do I need to write my own?
Some similar questions exist on SO but not exactly like this. Here's an extension method that does the trick.
public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
(this IObservable<TSource> source,
int maxAmount, TimeSpan threshold)
{
return Observable.Create<IList<TSource>>((obs) =>
{
return source.GroupByUntil(_ => true,
g => g.Throttle(threshold).Select(_ => Unit.Default)
.Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
.SelectMany(i => i.ToList())
.Subscribe(obs);
});
}