Search code examples
c#enumeratetpl-dataflow

Streaming data via IEnumerable & TPL Dataflow


I am getting items from an upstream API which is quite slow. I try to speed this up by using TPL Dataflow to create multiple connections and bring these together, like this;

class Stuff
{
    int Id { get; }
}

async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)
{
    var bagOfStuff = new ConcurrentBag<Stuff>();

    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5
    };

    var processor = new ActionBlock<int>(async id =>
    {
        bagOfStuff.Add(await GetStuffById(id));
    }, options);

    foreach (int id in ids)
    {
        processor.Post(id);
    }

    processor.Complete();
    await processor.Completion;

    return bagOfStuff.ToArray();
}

The problem is that I have to wait until I have finished querying the entire collection of Stuff before I can return it to the caller. What I would prefer is that, whenever any of the multiple parallel queries returns an item, I return that item in a yield return fashion. Therefore I don't need to return an sync Task<IEnumerable<Stuff>>, I can just return an IEnumerable<Stuff> and the caller advances the iteration as soon as any items return.

I tried doing it like this;

IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5
    };

    var processor = new ActionBlock<int>(async id =>
    {
        yield return await GetStuffById(id);
    }, options);

    foreach (int id in ids)
    {
        processor.Post(id);
    }

    processor.Complete();
    processor.Completion.Wait();

    yield break;
}

But I get an error

The yield statement cannot be used inside an anonymous method or lambda expression

How can I restructure my code?


Solution

  • You can return an IEnumerable, but to do so you must block your current thread. You need a TransformBlock to process the ids, and a feeder-task that will feed asynchronously the TransformBlock with ids. Finally the current thread will enter a blocking loop, waiting for produced stuff to yield:

    static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
    {
        using var completionCTS = new CancellationTokenSource();
    
        var processor = new TransformBlock<int, Stuff>(async id =>
        {
            return await GetStuffById(id);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
            BoundedCapacity = 50, // Avoid buffering millions of ids
            CancellationToken = completionCTS.Token
        });
    
        var feederTask = Task.Run(async () =>
        {
            try
            {
                foreach (int id in ids)
                    if (!await processor.SendAsync(id)) break;
            }
            finally { processor.Complete(); }
        });
    
        try
        {
            while (processor.OutputAvailableAsync().Result)
                while (processor.TryReceive(out var stuff))
                    yield return stuff;
        }
        finally // This runs when the caller exits the foreach loop
        {
            completionCTS.Cancel(); // Cancel the TransformBlock if it's still running
        }
    
        Task.WaitAll(feederTask, processor.Completion); // Propagate all exceptions
    }
    

    No ConcurrentBag is needed, since the TransformBlock has an internal output buffer. The tricky part is dealing with the case that the caller will abandon the enumeration of the IEnumerable<Stuff> by breaking early, or by being obstructed by an exception. In this case you don't want the feeder-task to keep pumping the IEnumerable<int> with the ids till the end. Fortunately there is a solution. Enclosing the yielding loop in a try/finally block allows a notification of this event to be received, so that the feeder-task can be terminated in a timely manner.

    An alternative implementation could remove the need for a feeder-task by combining pumping the ids, feeding the block, and yielding stuff in a single loop. In this case you would want a lag between pumping and yielding. To achieve it, the MoreLinq's Lag (or Lead) extension method could be handy.


    Update: Here is a different implementation, that enumerates and yields in the same loop. To achieve the desired lagging, the source enumerable is right-padded with some dummy elements, equal in number with the degree of concurrency.

    This implementation accepts generic types, instead of int and Stuff.

    public static IEnumerable<TResult> Transform<TSource, TResult>(
        IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
        int degreeOfConcurrency)
    {
        var processor = new TransformBlock<TSource, TResult>(async item =>
        {
            return await taskFactory(item);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = degreeOfConcurrency
        });
    
        var paddedSource = source.Select(item => (item, true))
            .Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
        int index = -1;
        bool completed = false;
        foreach (var (item, hasValue) in paddedSource)
        {
            index++;
            if (hasValue) { processor.Post(item); }
            else if (!completed) { processor.Complete(); completed = true; }
            if (index >= degreeOfConcurrency)
            {
                if (!processor.OutputAvailableAsync().Result) break; // Blocking call
                if (!processor.TryReceive(out var result))
                    throw new InvalidOperationException(); // Should never happen
                yield return result;
            }
        }
        processor.Completion.Wait();
    }
    

    Usage example:

    IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);
    

    Both implementations can be modified trivially to return an IAsyncEnumerable instead of IEnumerable, to avoid blocking the calling thread.