Search code examples
c#async-awaittask-parallel-libraryparallel.foreachtpl-dataflow

How to write to an object after TransformBlock?


I have a list of objects that I need to iterate over parallely. This is what I need to do:

foreach (var r in results)
{
    r.SomeList = await apiHelper.Get(r.Id);
}

Since I want to parellelize it, I tried to use Parallel.ForEach() but it doesn't wait until everything is really complete since apiHelper.Get() is doing an await inside of its own.

Parallel.ForEach(
                results,
                async (r) =>
                {
                    r.SomeList = await apiHelper.Get(r.Id);
                });

So I searched online and found this: Nesting await in Parallel.ForEach

Now I am very new to TPL (20 mins old) and I may be missing something obvious. How do I go ahead?

        var getBlock = new TransformBlock<string, List<Something>>(
            async i =>
            {
                var c = await apiHelper.Get(i);
                return c;
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
            });

        foreach (var r in results)
        {
            r.SomeList = getBlock.Post(r.Id);  // ERROR: Can't convert boolean to list.
        }

        getBlock.Complete();

Solution

  • Here is an a example using ActionBlock Class in the TPL dataflow library.

    It basically give you parallel, and async and its fairly easy to understand

    Dataflow example

    public static async Task DoWorkLoads(List<Something> results)
    {
       var options = new ExecutionDataflowBlockOptions
                         {
                            MaxDegreeOfParallelism = 50
                         };
    
       var block = new ActionBlock<Something>(MyMethodAsync, options);
    
       foreach (var result in results)
          block.Post(result );
    
       block.Complete();
       await block.Completion;
    
    }
    
    ...
    
    public async Task MyMethodAsync(Something result)
    {       
       result.SomeList = await apiHelper.Get(result.Id);
    }
    

    Obviously, you will need error checking, and add pepper and salt to taste

    Also, its assuming apiHelper is thread safe