Is it the correct way to use ConcurrentDictionary and ConcurrentBag to AddOrUpdate values.
Basically tried to do as follow,
having file with millions of records and trying to process and extract to object.
And entry is like, Key-Value pair, Key=WBAN and Value as object.
var cd = new ConcurrentDictionary<String, ConcurrentBag<Data>>();
int count = 0;
foreach (var line in File.ReadLines(path).AsParallel().WithDegreeOfParallelism(5))
{
var sInfo = line.Split(new char[] { ',' });
cd.AddOrUpdate(sInfo[0], new ConcurrentBag<Data>(){ new Data()
{
WBAN = sInfo[0],
Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
}
}
,
(oldKey, oldValue) =>
{
oldValue.Add(new Data()
{
WBAN = sInfo[0],
Date = string.IsNullOrEmpty(sInfo[1]) ? "" : sInfo[1],
time = string.IsNullOrEmpty(sInfo[2]) ? "" : sInfo[2]
});
return oldValue;
}
);
}
Data
element to a Dictionary<String,List<Data>>
is a tiny tiny tiny fraction of the time it takes for your computer to read a single line from a text-file.File.ReadLines
for programs like these because that will first read the entire file into memory.
StreamReader
to read each line one-by-one which means it doesn't need to wait until it reads everything into memory first.So to parse that file with the best possible performance you don't need any Concurrent collections.
Just this:
private static readonly Char[] _sep = new Char[] { ',' }; // Declared here to ensure only a single array allocation.
public static async Task< Dictionary<String,List<Data>> > ReadFileAsync( FileInfo file )
{
const Int32 ONE_MEGABYTE = 1 * 1024 * 1024; // Use 1MB+ sized buffers for async IO. Not smaller buffers like 1024 or 4096 as those are for synchronous IO.
Dictionary<String,List<Data>> dict = new Dictionary<String,List<Data>>( capacity: 1024 );
using( FileStream fs = new FileStream( path, FileAccess.Read, FileMode.Open, FileShare.Read, ONE_MEGABYTE, FileOptions.Asynchronous | FileOptions.SequentialScan ) )
using( StreamReader rdr = new StreamReader( fs ) )
{
String line;
while( ( line = await rdr.ReadLineAsync().ConfigureAwait(false) ) != null )
{
String[] values = line.Split( sep );
if( values.Length < 3 ) continue;
Data d = new Data()
{
WBAN = values[0],
Date = values[1],
time = values[2]
};
if( !dict.TryGetValue( d.WBAN, out List<Data> list ) )
{
dict[ d.WBAN ] = list = new List<Data>();
}
list.Add( d );
}
}
}
Hypothetically speaking, because file IO (especially asynchronous FileStream
IO) uses large buffers (in this case, a ONE_MEGABYTE
-sized buffer) the program could pass each buffer (as it's read, sequentially) into a parallel processor.
However the problem is that the data inside that buffer cannot be trivially apportioned-out to individual threads: in this case because the length of a line is not fixed, so a single thread still needs to read through the entire buffer to find out where the line-breaks are (and technically that could be parallelized somewhat, it would be adding huge amounts of complexity (as you also need to handle lines that cross buffer boundaries, or buffers that contain only a single line, etc).
And at this small scale the overhead of using the thread-pool and concurrent collection-types would erase the speed-up of parallel-procesing given the program would still largely be IO-bound.
Now, if you had a file sized in the gigabytes, with Data
records that were sized around 1KB then I would go into detail demonstrating how you could do it because at that scale you probably would see a modest performance boost.