I was trying out a very simple Producer Consumer approach using BlockingCollection
as per the article described here: MS Docs to understand multithreading.
My producer is a single task which reads from an XML file (has around 4000 nodes) and pushes XElement
nodes to a blocking collection.
My consumer will have multiple threads reading from the blocking collection and uploading files to a site based on the XElement
.
The problem here is that the program unexpectedly closes every time I try to run it. It hits the producer Task.Run
but stops after that. I'm not able to understand the reason why. Am I doing something wrong? It doesn't even hit the catch
block.
Code is below:
BlockingCollection<XElement> collection = new BlockingCollection<XElement>(100);
string metadataFilePath = exportLocation + listTitle + "\\Metadata\\" + exportJobId + ".xml";
//create the producer
Task.Run(() =>
{
//Process only the files that have not been uploaded
XDocument xmlFile = XDocument.Load(metadataFilePath);
var query = from c in xmlFile.Elements("Items").Elements("Item")
where c.Attribute("IsUploaded").Value == "No"
select c;
foreach (var item in query)
{
collection.Add(item);
}
collection.CompleteAdding();
});
//process consumer
Parallel.ForEach(collection, (new System.Threading.Tasks.ParallelOptions { MaxDegreeOfParallelism = 2 }), (metadata) => {
ProcessItems();
});
The answer by Nish26 is correct for the issue in the question.
I would propose solving your producer/consumer problem with Microsoft TPL Dataflow instead:
using System.Threading.Tasks.Dataflow;
var parallelBoundedOptions = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 100,
MaxDegreeOfParallelism = 2,
};
var uploadItemBlock = new ActionBlock<XElement>(
item => ProcessItem(item),
parallelBoundedOptions
);
string metadataFilePath = exportLocation + listTitle + "\\Metadata\\" + exportJobId + ".xml";
XDocument xmlFile = XDocument.Load(metadataFilePath);
var query = from c in xmlFile.Elements("Items").Elements("Item")
where c.Attribute("IsUploaded").Value == "No"
select c;
foreach (var item in query)
{
uploadItemBlock.SendAsync(item).Wait();
}
uploadItemBlock.Complete();
uploadItemBlock.Completion.Wait();
Dataflow makes it easier to focus on producing and consuming the items instead of how to pass them from the producer to the consumer.
The actual issue in the question is that Parallel.Foreach
is using BlockingCollection<T>.IEnumerable<T>.GetEnumerator
instead of BlockingCollection<T>.GetConsumingEnumerable
as demonstrated here:
static void Main()
{
var collection = new BlockingCollection<int>(100);
Task.Run(()=>
{
foreach (var element in Enumerable.Range(0, 100_000))
{
collection.Add(element);
}
collection.CompleteAdding();
});
Parallel.ForEach(
collection,
new ParallelOptions { MaxDegreeOfParallelism = 2},
i => Console.WriteLine(i));
Console.WriteLine("Done");
}
Prints "Done" immediately
static void Main()
{
var collection = new BlockingCollection<int>(100);
Task.Run(()=>
{
foreach (var element in Enumerable.Range(0, 100_000))
{
collection.Add(element);
}
collection.CompleteAdding();
});
Parallel.ForEach(
collection.GetConsumingEnumerable(),
new ParallelOptions { MaxDegreeOfParallelism = 2},
i => Console.WriteLine(i));
Console.WriteLine("Done");
}
Prints all the numbers