Search code examples
c#dictionaryblockingcollection

BlockingCollection and Dictionary


I have a question about writing codes using BlockingCollection and Dictionary.

My goal is to read a bunch of text files and process them in a parallel fashion. The processed data would be stored in a BlockingCollection instance so that these processed data can be written to a file.

The reason why I want to use the BlockingCollection is ...

(1) to save time while the GenerateDataFiles() is doing CPU-intensive work and the consumer Task can do IO related work in the meantime, and

(2) to reduce memory usage compared to the case when I store all the processed data in a list before writing any of them to a file.

For (2), if I store all data before writing them to a file, the memory consumption is more than my desktop can afford (since it reads more than 30GB of data), and so I have to use this producer-consumer approach.

Also, I got a problem in inserting the key-value pair in the BlockingCollection instance (or dictionary). Please indicate the correct approach to do the data insertion.

The following codes are my attempt to address the problem. I may have made some mistakes in this since I am new to BlockingCollection. Please suggest some changes (and amended codes) so that I can resolve the problem.

class SampleClass
{
    static void Main(string[] args)
    {            
        SampleClass sampleClass = new SampleClass();
        sampleClass.run();
    }

    private void run()
    {
        Task consumer = Task.Factory.StartNew(() => WriteDataToFiles());
        GenerateDataFiles();
    }

    BlockingCollection<Dictionary<string, List<string>>> bc = new BlockingCollection<Dictionary<string, List<string>>>();

    private void GenerateDataFiles()
    {
        DirectoryInfo directory = new DirectoryInfo(@"D:\Data\");
        FileInfo[] array_FileInfo = directory.GetFiles("*.txt", SearchOption.TopDirectoryOnly);

        Parallel.ForEach(array_FileInfo, fileInfo => 
        {
            string[] array_Lines = File.ReadAllLines(fileInfo.FullName);

            // do some CPU-intensive data parsing and then add the processed data to the blocking collection
            // It has to be inserted in pairs (key = file path, value = list of strings to be written to this file)

        });
    }

    private void WriteDataToFiles()
    {
        foreach (var item in bc.GetConsumingEnumerable())
        {
            foreach (var key in item.Keys)
            {
                File.WriteAllLines(key, item[key]);
            }
        }

    }
}

Solution

  • Consider using Tuple instead of a Dictionary inside the BlockingCollection. Additionally, you need a call to CompleteAdding() to end the foreach in WriteDataToFiles.

    BlockingCollection<Tuple<string, List<string>>> bc = new BlockingCollection<Tuple<string, List<string>>>();
    
    private void GenerateDataFiles()
    {
        DirectoryInfo directory = new DirectoryInfo(@"D:\Data\");
        FileInfo[] array_FileInfo = directory.GetFiles("*.txt", SearchOption.TopDirectoryOnly);
    
        Parallel.ForEach(array_FileInfo, fileInfo => 
        {
            string[] array_Lines = File.ReadAllLines(fileInfo.FullName);
    
            // do some CPU-intensive data parsing and then add the processed data to the blocking collection
            // It has to be inserted in pairs (key = file path, value = list of strings to be written to this file)
            List<string> processedData = new List<string>();  // ... and add content
            bc.Add(new Tuple<string, List<string>>(fileInfo.FullName, processedData));
        });
        bc.CompleteAdding();
    }
    
    private void WriteDataToFiles()
    {
        foreach (var tuple in bc.GetConsumingEnumerable())
        {
            File.WriteAllLines(tuple.Item1, tuple.Item2);
        }
    }