Search code examples
c#asynchronousasync-awaitienumerable

How can I asynchronously transform one IEnumerable to another, just like LINQ's Select(), but using await on every transformed item?


Consider this situation:

class Product { }

interface IWorker
{
    Task<Product> CreateProductAsync();
}

I am now given an IEnumerable<IWorker> workers and am supposed to create an IEnumerable<Product> from it that I have to pass to some other function that I cannot alter:

void CheckProducts(IEnumerable<Product> products);

This methods needs to have access to the entire IEnumerable<Product>. It is not possible to subdivide it and call CheckProducts on multiple subsets.

One obvious solution is this:

CheckProducts(workers.Select(worker => worker.CreateProductAsync().Result));

But this is blocking, of course, and hence it would only be my last resort. Syntactically, I need precisely this, just without blocking.

I cannot use await inside of the function I'm passing to Select() as I would have to mark it as async and that would require it to return a Task itself and I would have gained nothing. In the end I need an IEnumerable<Product> and not an IEnumerable<Task<Product>>.

It is important to know that the order of the workers creating their products does matter, their work must not overlap. Otherwise, I would do this:

async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
    var tasks = workers.Select(worker => worker.CreateProductAsync());
    return await Task.WhenAll(tasks);
}

But unfortunately, Task.WhenAll() executes some tasks in parallel while I need them executed sequentially.

Here is one possibility to implement it if I had an IReadOnlyList<IWorker> instead of an IEnumerable<IWorker>:

async Task<IEnumerable<Product>> CreateProductsAsync(IReadOnlyList<IWorker> workers)
{
    var resultList = new Product[workers.Count];
    for (int i = 0; i < resultList.Length; ++i)
        resultList[i] = await workers[i].CreateProductAsync();
    return resultList;
}

But I must deal with an IEnumerable and, even worse, it is usually quite huge, sometimes it is even unlimited, yielding workers forever. If I knew that its size was decent, I would just call ToArray() on it and use the method above.

The ultimate solution would be this:

async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
    foreach (var worker in workers)
        yield return await worker.CreateProductAsync();
}

But yield and await are incompatible as described in this answer. Looking at that answer, would that hypothetical IAsyncEnumerator help me here? Does something similar meanwhile exist in C#?

A summary of the issues I'm facing:

  • I have a potentially endless IEnumerable<IWorker>
  • I want to asynchronously call CreateProductAsync() on each of them in the same order as they are coming in
  • In the end I need an IEnumerable<Product>

A summary of what I already tried, but doesn't work:

  • I cannot use Task.WhenAll() because it executes tasks in parallel.
  • I cannot use ToArray() and process that array manually in a loop because my sequence is sometimes endless.
  • I cannot use yield return because it's incompatible with await.

Does anybody have a solution or workaround for me? Otherwise I will have to use that blocking code...


Solution

  • IEnumerator<T> is a synchronous interface, so blocking is unavoidable if CheckProducts enumerates the next product before the next worker has finished creating the product.

    Nevertheless, you can achieve parallelism by creating products on another thread, adding them to a BlockingCollection<T>, and yielding them on the main thread:

    static IEnumerable<Product> CreateProducts(IEnumerable<IWorker> workers)
    {
        var products = new BlockingCollection<Product>(3);
    
        Task.Run(async () => // On the thread pool...
        {
            foreach (IWorker worker in workers)
            {
                Product product = await worker.CreateProductAsync(); // Create products serially.
                products.Add(product); // Enqueue the product, blocking if the queue is full.
            }
    
            products.CompleteAdding(); // Notify GetConsumingEnumerable that we're done.
        });
    
        return products.GetConsumingEnumerable();
    }
    

    To avoid unbounded memory consumption, you can optionally specify the capacity of the queue as a constructor argument to BlockingCollection<T>. I used 3 in the code above.