I'm currently using an ActionBlock to process serially started asynchronous jobs. It works very well for processing each item Posted to it, but there is no way to collect a list of the results from each job.
What can I use to collect the results of my jobs in a thread safe manner?
My code is currently something like this:
var actionBlock = new ActionBlock<int> (async i => await Process(i));
for(int i = 0; i < 100; i++)
{
actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;
I've tried using a TransformBlock instead, but it hangs indefinitely when awaiting the Completion. The completion's status is "WaitingForActivation".
My code with the TransformBlock is something like this:
var transformBlock = new TransformBlock<int, string> (async i => await Process(i));
for(int i = 0; i < 100; i++)
{
actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;
transformBlock.TryReceiveAll(out IList<string> strings);
It turns out a ConcurrentBag is the answer
var bag = new ConcurrentBag<string>();
var actionBlock = new ActionBlock<int> (async i =>
bag.Add(await Process(i))
);
for(int i = 0; i < 100; i++)
{
actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;
Now 'bag' has all the results in it, and can be accessed as an IEnumerable.
The code I've actually ended up using uses a Parallel.ForEach instead of the ActionBlock.
Parallel.ForEach
(
inputData,
i => bag.Add(await Process(i))
);
This is quite a lot simpler, but seems about as good for performance and still has options to limit the degree of parallelism etc.