Search code examples
c#ienumerabletaskcompletionsourceiasyncenumerable

Build an IAsyncEnumerable using TaskCompletionSource


I have a method that accepts an IEnumerable and returns it transformed using the yield operator. To transform one element of the enumerable, I need to first know the value of another element. Therefore, I thought of using a TaskCompletionSource to create something like a "promise".

The problem here is that this code results in a deadlock if anything other than "a" is the value of the first TestFieldA. One solution would be to order the enumerable before passing it into the method - in which case there is no need for TaskCompletionSource altogether. I would like to know however if it can be done without this. I also know that this can be done with some LINQ queries, but this would require enumerating the input several times, which I would like to avoid.

This is is what I'm trying to achieve. (Only works if the first TestFieldA == "a")

class Test
{
    public string TestFieldA {get;set;}
    public int TestFieldB {get;set;}
}


private async IAsyncEnumerable<Test> Transform(IEnumerable<Test> inputEnumerable)
{
    var tcs = new TaskCompletionSource<int>();

    foreach(var input in inputEnumerable)
    {
        if (input.TestFieldA == "a")
        {
            tcs.SetResult(input.TestFieldB);
            yield return input;
        }
        else
        {
            input.TestFieldB -= await tcs.Task;
            yield return input;
        }
    }
}

Solution

  • An idea could be to return an enumerable of tasks instead of an IAsyncEnumerable. Something like this:

    private IEnumerable<Task<Test>> Transform(IEnumerable<Test> source)
    {
        var tcs = new TaskCompletionSource<int>(
            TaskCreationOptions.RunContinuationsAsynchronously);
    
        foreach (var item in source)
        {
            if (item.TestFieldA == "a")
            {
                tcs.TrySetResult(item.TestFieldB);
            }
            yield return TransformItemAsync(item);
        }
    
        async Task<Test> TransformItemAsync(Test input)
        {
            var value = await tcs.Task.ConfigureAwait(false);
            input.TestFieldB -= value;
            return input;
        }
    }
    

    This would still create a deadlock problem if the caller attempted to await each task in sequence. To solve this problem the caller should have a way to await somehow the tasks in order of completion. There is something like that in Stephen Cleary's Nito.AsyncEx library, the extension method OrderByCompletion:

    // Creates a new collection of tasks that complete in order.
    public static List<Task<T>> OrderByCompletion<T>(this IEnumerable<Task<T>> @this);
    

    You can also grab the source code from here if you want.