Search code examples
c#.netenumeratorbackpressure

An enumerator wrapper that pre-buffers a number of items from underlying enumerator in advance


Suppose I have some IEnumerator<T> which does a fair amount of processing inside the MoveNext() method.

The code consuming from that enumerator does not just consume as fast as data is available, but occasionally waits (the specifics of which are irrelevant to my question) in order to synchronize the time when it needs to resume consumption. But when it does the next call to MoveNext(), it needs the data as fast as possible.

One way would be to pre-consume the whole stream into some list or array structure for instant enumeration. That would be a waste of memory however, as at any single point in time, only one item is in use, and it would be prohibitive in cases where the whole data does not fit into memory.

So is there something generic in .net that wraps an enumerator / enumerable in a way that it asynchronously pre-iterates the underlying enumerator a couple of items in advance and buffers the results so that it always has a number of items available in its buffer and the calling MoveNext will never have to wait? Obviously items consumed, i.e. iterated over by a subsequent MoveNext from the caller, would be removed from the buffer.

N.B. Part of what I'm trying to do is also called Backpressure, and, in the Rx world, has already been implemented in RxJava and is under discussion in Rx.NET. Rx (observables that push data) can be considered the opposite approach of enumerators (enumerators allow pulling of data). Backpressure is relatively easy in the pulling approach, as my answer shows: Just pause consumption. It's harder when pushing, requiring an additional feedback mechanism.


Solution

  • A more concise alternative to your custom enumerable class is to do this:

    public static IEnumerable<T> Buffer<T>(this IEnumerable<T> source, int bufferSize)
    {
        var queue = new BlockingCollection<T>(bufferSize);
    
        Task.Run(() => {
            foreach(var i in source) queue.Add(i);
            queue.CompleteAdding();
        });
    
        return queue.GetConsumingEnumerable();
    }
    

    This can be used as:

    var slowEnumerable = GetMySlowEnumerable();
    var buffered = slowEnumerable.Buffer(10); // Populates up to 10 items on a background thread