Search code examples
c#multithreadingparallel.foreachconcurrentdictionary

ConcurrentDictionary and ConcurrentBag for AddOrUpdate on parallel


Is it the correct way to use ConcurrentDictionary and ConcurrentBag to AddOrUpdate values.

Basically tried to do as follow,

  1. having file with millions of records and trying to process and extract to object.

  2. And entry is like, Key-Value pair, Key=WBAN and Value as object.

     var cd = new ConcurrentDictionary<String, ConcurrentBag<Data>>();
     int count = 0;
    
     foreach (var line in File.ReadLines(path).AsParallel().WithDegreeOfParallelism(5))
     {
         var sInfo = line.Split(new char[] { ',' });
         cd.AddOrUpdate(sInfo[0], new ConcurrentBag<Data>(){ new Data()
         {
             WBAN =  sInfo[0],
                 Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
                 time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
     }
         }
         ,
         (oldKey, oldValue) =>
         {
             oldValue.Add(new Data()
             {
                 WBAN = sInfo[0],
                 Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
                 time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
             });
    
             return oldValue;
         }
         );
     }
    

Solution

    • Your program is IO-bound, not CPU-bound, so there is no advantage to parallelizing your processing.
      • It's IO-bound because your program can't process a line of data without having first read that line from the file, and generally speaking computers always read data from storage much more slowly than they can process it.
      • As your program is performing only trivial string operations on each line read, I can safely say with 99.9% confidence that the time it takes to add a Data element to a Dictionary<String,List<Data>> is a tiny tiny tiny fraction of the time it takes for your computer to read a single line from a text-file.
    • Also, avoid using File.ReadLines for programs like these because that will first read the entire file into memory.
      • If you look at my solution, you'll see it uses StreamReader to read each line one-by-one which means it doesn't need to wait until it reads everything into memory first.

    So to parse that file with the best possible performance you don't need any Concurrent collections.

    Just this:

    private static readonly Char[] _sep = new Char[] { ',' }; // Declared here to ensure only a single array allocation.
    
    public static async Task< Dictionary<String,List<Data>> > ReadFileAsync( FileInfo file )
    {
        const Int32 ONE_MEGABYTE = 1 * 1024 * 1024; // Use 1MB+ sized buffers for async IO. Not smaller buffers like 1024 or 4096 as those are for synchronous IO.
    
        Dictionary<String,List<Data>> dict = new Dictionary<String,List<Data>>( capacity: 1024 );
    
    
        using( FileStream fs = new FileStream( path, FileAccess.Read, FileMode.Open, FileShare.Read, ONE_MEGABYTE, FileOptions.Asynchronous | FileOptions.SequentialScan ) )
        using( StreamReader rdr = new StreamReader( fs ) )
        {
            String line;
            while( ( line = await rdr.ReadLineAsync().ConfigureAwait(false) ) != null )
            {
                String[] values = line.Split( sep );
                if( values.Length < 3 ) continue;
    
                Data d = new Data()
                {
                    WBAN = values[0],
                    Date = values[1],
                    time = values[2]
                };
    
                if( !dict.TryGetValue( d.WBAN, out List<Data> list ) )
                {
                    dict[ d.WBAN ] = list = new List<Data>();
                }
    
                list.Add( d );
            }
        }
    }
    

    Update: Hypothetically speaking...

    Hypothetically speaking, because file IO (especially asynchronous FileStream IO) uses large buffers (in this case, a ONE_MEGABYTE-sized buffer) the program could pass each buffer (as it's read, sequentially) into a parallel processor.

    However the problem is that the data inside that buffer cannot be trivially apportioned-out to individual threads: in this case because the length of a line is not fixed, so a single thread still needs to read through the entire buffer to find out where the line-breaks are (and technically that could be parallelized somewhat, it would be adding huge amounts of complexity (as you also need to handle lines that cross buffer boundaries, or buffers that contain only a single line, etc).

    And at this small scale the overhead of using the thread-pool and concurrent collection-types would erase the speed-up of parallel-procesing given the program would still largely be IO-bound.

    Now, if you had a file sized in the gigabytes, with Data records that were sized around 1KB then I would go into detail demonstrating how you could do it because at that scale you probably would see a modest performance boost.