Search code examples
c#async-awaittasktypeerrorwait

.NET Task returning object and calling Async inside


I have some tasks executing in a WhenAll(). I get a semantic error if a task returns an object and calls an async method inside their Run(). The async method fetches from Blob some string content, then constructs and returns an object.

Do you know how to solve this issue, while maintaining the batch download done by tasks? I need a list with those FinalWrapperObjects.

Error message

Cannot convert async lamba expression to delegate type 'Func<FinalWrapperObject>'. An async lambda expression may return void, Task or Task, none of which are convertible to 'Func<FinalWrapperObject>'.

...
List<FinalWrapperObject> finalReturns = new List<FinalWrapperObject>();
List<Task<FinalWrapperObject>> tasks = new List<Task<FinalWrapperObject>>();
var resultsBatch = fetchedObjects.Skip(i).Take(10).ToList();

foreach (var resultBatchItem in resultsBatch)
{
    tasks.Add(
        new Task<FinalWrapperObject>(async () => //!! errors here on arrow
        {
            var blobContent = await azureBlobService.GetAsync(resultBatchItem.StoragePath);
            return new FinalWrapperObject {
                BlobContent = blobContent,
                CreationDateTime = resultBatchItem.CreationDateTime
            };
        })
    );
}

FinalWrapperObject[] listFinalWrapperObjects = await Task.WhenAll(tasks);
finalReturns.AddRange(listFinalWrapperObjects);

return finalReturns;

Solution

  • Your code never starts any tasks. Tasks aren't threads anyway. They're a promise that something will complete and maybe produce a value in the future. Some tasks require a thread to run. These are executed using threads that come from a threadpool. Others, eg async IO operations, don't require a thread. Uploading a file is such an IO operation.

    Your lambda is asynchronous and already returning a Task so there's no reason to use Task.Run. You can execute it once for all items, collect the Tasks in a list and await all of them. That's the bare-bones way :

    async Task<FinalWrapperObject> UploadItemAsync(BatchItem resultBatchItem) =>
    {
        var blobContent = await azureBlobService.GetAsync(resultBatchItem.StoragePath);
        return new FinalWrapperObject {
            BlobContent = blobContent,
            CreationDateTime = resultBatchItem.CreationDateTime
        };
    }
    
    ...
    
    var tasks=resultsBatch.Select(UploadItemAsync);
    var results=await Task.WhenAll(tasks);
    

    Using TPL Dataflow

    A better option would be to use the TPL Dataflow classes to upload items concurrently and even construct a pipeline from processing blocks.

    var options= new ExecutionDataflowBlockOptions
             {
                MaxDegreeOfParallelism = 10
             };
    var results=new BufferBlock<FinalWrapperObject>();
    var uploader=new TransformBlock<BatchItem,FinalWrapperObject>(UploadItemAsync,options);
    uploader.LinkTo(results);
    
    foreach(var item in fetchedObjects)
    {
        uploader.PostAsync(item);
    }
    uploader.Complete();
    await uploader.Completion;
    

    By default, a block only processes one message at a time. Using MaxDegreeOfParallelism = 10 we're telling it to process 10 items concurrently. This code will upload 10 items concurrently at a time, as long as there items to post to the uploader block.

    The results are forwarded to the results BufferBlock. The items can be extracted with TryReceiveAll :

    IList<FinalWrapperObject> items;
    results.TryReceiveAll(out items);
    

    Dataflow blocks can be combined into a pipeline. You could have a block that loads items from disk, another to upload them and a final one that stores the response to another file or database :

    var dop10= new ExecutionDataflowBlockOptions
             {
                MaxDegreeOfParallelism = 10,
                BoundedCapacity=4
             };
    var bounded= new ExecutionDataflowBlockOptions
             {
                BoundedCapacity=4
             };
    
    var loader=new TransformBlock<FileInfo,BatchItem>(LoadFile,bounded);
    var uploader=new TransformBlock<BatchItem,FinalWrapperObject>(UploadItemAsync,dop10);
    var dbLogger=new ActionBlock<FinalWrapperObject>(bounded);
    
    var linkOptions=new DataflowLinkOptions {PropagateCompletion=true};
    loader.LinkTo(uploader,linkOptions);
    uploader.LinkTo(dbLogger,linkOptions);
    
    var folder=new DirectoryInfo(rootPath);
    foreach(var item in folder.EnumerateFiles())
    {
        await loader.SendAsync(item);
    }
    loader.Complete();
    
    await dbLogger.Completion;
    

    In this case, all files in a folder are posted to the loader block which loads files one by one and forwards a BatchItem. The uploader uploads the file and the results are stored by dbLogger. In the end, we tell loader we're finished and wait for all items to get processed all the way to the end with await dbLogger.Completion.

    The BoundedCapacity is used to put a limit on how many items can be held at each block's input buffer. This prevents loading all files into memory.