I have some tasks executing in a WhenAll()
. I get a semantic error if a task returns an object and calls an async method inside their Run()
. The async method fetches from Blob some string content, then constructs and returns an object.
Do you know how to solve this issue, while maintaining the batch download done by tasks?
I need a list with those FinalWrapperObjects
.
Cannot convert async lamba expression to delegate type 'Func<FinalWrapperObject>'. An async lambda expression may return void, Task or Task, none of which are convertible to 'Func<FinalWrapperObject>'.
...
List<FinalWrapperObject> finalReturns = new List<FinalWrapperObject>();
List<Task<FinalWrapperObject>> tasks = new List<Task<FinalWrapperObject>>();
var resultsBatch = fetchedObjects.Skip(i).Take(10).ToList();
foreach (var resultBatchItem in resultsBatch)
{
tasks.Add(
new Task<FinalWrapperObject>(async () => //!! errors here on arrow
{
var blobContent = await azureBlobService.GetAsync(resultBatchItem.StoragePath);
return new FinalWrapperObject {
BlobContent = blobContent,
CreationDateTime = resultBatchItem.CreationDateTime
};
})
);
}
FinalWrapperObject[] listFinalWrapperObjects = await Task.WhenAll(tasks);
finalReturns.AddRange(listFinalWrapperObjects);
return finalReturns;
Your code never starts any tasks. Tasks aren't threads anyway. They're a promise that something will complete and maybe produce a value in the future. Some tasks require a thread to run. These are executed using threads that come from a threadpool. Others, eg async IO operations, don't require a thread. Uploading a file is such an IO operation.
Your lambda is asynchronous and already returning a Task
so there's no reason to use Task.Run
. You can execute it once for all items, collect the Tasks in a list and await all of them. That's the bare-bones way :
async Task<FinalWrapperObject> UploadItemAsync(BatchItem resultBatchItem) =>
{
var blobContent = await azureBlobService.GetAsync(resultBatchItem.StoragePath);
return new FinalWrapperObject {
BlobContent = blobContent,
CreationDateTime = resultBatchItem.CreationDateTime
};
}
...
var tasks=resultsBatch.Select(UploadItemAsync);
var results=await Task.WhenAll(tasks);
Using TPL Dataflow
A better option would be to use the TPL Dataflow classes to upload items concurrently and even construct a pipeline from processing blocks.
var options= new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
var results=new BufferBlock<FinalWrapperObject>();
var uploader=new TransformBlock<BatchItem,FinalWrapperObject>(UploadItemAsync,options);
uploader.LinkTo(results);
foreach(var item in fetchedObjects)
{
uploader.PostAsync(item);
}
uploader.Complete();
await uploader.Completion;
By default, a block only processes one message at a time. Using MaxDegreeOfParallelism = 10
we're telling it to process 10 items concurrently. This code will upload 10 items concurrently at a time, as long as there items to post to the uploader
block.
The results are forwarded to the results
BufferBlock. The items can be extracted with TryReceiveAll
:
IList<FinalWrapperObject> items;
results.TryReceiveAll(out items);
Dataflow blocks can be combined into a pipeline. You could have a block that loads items from disk, another to upload them and a final one that stores the response to another file or database :
var dop10= new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10,
BoundedCapacity=4
};
var bounded= new ExecutionDataflowBlockOptions
{
BoundedCapacity=4
};
var loader=new TransformBlock<FileInfo,BatchItem>(LoadFile,bounded);
var uploader=new TransformBlock<BatchItem,FinalWrapperObject>(UploadItemAsync,dop10);
var dbLogger=new ActionBlock<FinalWrapperObject>(bounded);
var linkOptions=new DataflowLinkOptions {PropagateCompletion=true};
loader.LinkTo(uploader,linkOptions);
uploader.LinkTo(dbLogger,linkOptions);
var folder=new DirectoryInfo(rootPath);
foreach(var item in folder.EnumerateFiles())
{
await loader.SendAsync(item);
}
loader.Complete();
await dbLogger.Completion;
In this case, all files in a folder are posted to the loader
block which loads files one by one and forwards a BatchItem
. The uploader
uploads the file and the results are stored by dbLogger
. In the end, we tell loader
we're finished and wait for all items to get processed all the way to the end with await dbLogger.Completion
.
The BoundedCapacity
is used to put a limit on how many items can be held at each block's input buffer. This prevents loading all files into memory.