Search code examples
c#multithreadingblockingcollection

Application unexpectedly closes while using multithreading


I was trying out a very simple Producer Consumer approach using BlockingCollection as per the article described here: MS Docs to understand multithreading.

My producer is a single task which reads from an XML file (has around 4000 nodes) and pushes XElement nodes to a blocking collection.

My consumer will have multiple threads reading from the blocking collection and uploading files to a site based on the XElement.

The problem here is that the program unexpectedly closes every time I try to run it. It hits the producer Task.Run but stops after that. I'm not able to understand the reason why. Am I doing something wrong? It doesn't even hit the catch block.

Code is below:

            BlockingCollection<XElement> collection = new BlockingCollection<XElement>(100);                
            string metadataFilePath = exportLocation + listTitle + "\\Metadata\\" + exportJobId + ".xml";
            //create the producer
            Task.Run(() =>
            {                    
                //Process only the files that have not been uploaded                                   
                XDocument xmlFile = XDocument.Load(metadataFilePath);
                var query = from c in xmlFile.Elements("Items").Elements("Item")
                            where c.Attribute("IsUploaded").Value == "No"
                            select c;
                foreach (var item in query)
                {
                    collection.Add(item);
                }
                collection.CompleteAdding();
            });

            //process consumer
            Parallel.ForEach(collection, (new System.Threading.Tasks.ParallelOptions { MaxDegreeOfParallelism = 2 }), (metadata) => {
                ProcessItems();
            });

Solution

  • The answer by Nish26 is correct for the issue in the question.

    I would propose solving your producer/consumer problem with Microsoft TPL Dataflow instead:

    using System.Threading.Tasks.Dataflow;
    
    var parallelBoundedOptions = new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 100,
        MaxDegreeOfParallelism = 2,
    };
    var uploadItemBlock = new ActionBlock<XElement>(
        item => ProcessItem(item),
        parallelBoundedOptions
    );
    string metadataFilePath = exportLocation + listTitle + "\\Metadata\\" + exportJobId + ".xml";
    XDocument xmlFile = XDocument.Load(metadataFilePath);
    var query = from c in xmlFile.Elements("Items").Elements("Item")
                where c.Attribute("IsUploaded").Value == "No"
                select c;
    foreach (var item in query)
    {
        uploadItemBlock.SendAsync(item).Wait();
    }
    uploadItemBlock.Complete();
    uploadItemBlock.Completion.Wait();
    

    Dataflow makes it easier to focus on producing and consuming the items instead of how to pass them from the producer to the consumer.

    The actual issue in the question is that Parallel.Foreach is using BlockingCollection<T>.IEnumerable<T>.GetEnumerator instead of BlockingCollection<T>.GetConsumingEnumerable as demonstrated here:

    static void Main()
    {
        var collection = new BlockingCollection<int>(100);
        Task.Run(()=>
        {
            foreach (var element in Enumerable.Range(0, 100_000))
            {
                collection.Add(element);
            }
            collection.CompleteAdding();
        });
    
        Parallel.ForEach(
            collection, 
            new ParallelOptions { MaxDegreeOfParallelism = 2},
            i => Console.WriteLine(i));
    
        Console.WriteLine("Done");
    }
    

    Prints "Done" immediately

    static void Main()
    {
        var collection = new BlockingCollection<int>(100);
        Task.Run(()=>
        {
            foreach (var element in Enumerable.Range(0, 100_000))
            {
                collection.Add(element);
            }
            collection.CompleteAdding();
        });
    
        Parallel.ForEach(
            collection.GetConsumingEnumerable(), 
            new ParallelOptions { MaxDegreeOfParallelism = 2},
            i => Console.WriteLine(i));
    
        Console.WriteLine("Done");
    }
    

    Prints all the numbers