Search code examples
c#system.reactivereactive-programming

"Buffer until quiet" behavior from Reactive?


My problem is sort of like the one the Nagle algorithm was created to solve, but not exactly. What I'd like is to buffer the OnNext notifications from an IObservable<T> into a sequence of IObservable<IList<T>>s like so:

  1. When the first T notification arrives, add it to a buffer and start a countdown
  2. If another T notification arrives before the countdown expires, add it to the buffer and restart the countdown
  3. Once the countdown expires (i.e. the producer has been silent for some length of time), forward all the buffered T notifications as a single aggregate IList<T> notification.
  4. If the buffer size grows beyond some maximum before the countdown expires, send it anyway.

IObservable<IList<T>> Buffer(this IObservable<T>, Timespan, int, IScheduler) looked promising, but it appears to send aggregate notifications out at regular intervals rather than doing the "start the timer when the first notification arrives and restart it when additional ones arrive" behavior I'd like, and it also sends out an empty list at the end of each time window if no notifications have been produced from below.

I do not want to drop any of the T notifications; just buffer them.

Does anything like this exist, or do I need to write my own?


Solution

  • Some similar questions exist on SO but not exactly like this. Here's an extension method that does the trick.

    public static IObservable<IList<TSource>> BufferWithThrottle<TSource>
                                              (this IObservable<TSource> source,
                                               int maxAmount, TimeSpan threshold)
    {
        return Observable.Create<IList<TSource>>((obs) =>
        {
            return source.GroupByUntil(_ => true,
                                       g => g.Throttle(threshold).Select(_ => Unit.Default)
                                             .Merge( g.Buffer(maxAmount).Select(_ => Unit.Default)))
                         .SelectMany(i => i.ToList())
                         .Subscribe(obs);
        });
    }