Search code examples
c#multithreadingparallel-processingtask-parallel-libraryplinq

How to properly parallelise job heavily relying on I/O


I'm building a console application that have to process a bunch of data.

Basically, the application grabs references from a DB. For each reference, parse the content of the file and make some changes. The files are HTML files, and the process is doing a heavy work with RegEx replacements (find references and transform them into links). The results in then stored on the file system and sent to an external system.

If I resume the process, in a sequential way :

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}

My program is working correctly but is quite slow. That's why I want to parallelise the process.

By now, I made a simple Parallelization adding AsParallel :

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref); 
    var html = File.ReadAllText(filePath); 
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); 
    SendToWs(ref, convertedHtml);
});

This simple change decrease the duration of the process (25% less time). However, what I understand with parallelization is that there won't be much benefits (or worse, less benefits) if parallelyzing over resources relying on I/O, because the i/o won't magically doubles.

That's why I think I should change my approach not to parallelize the whole process, but to create dependent chained queued tasks.

I.E., I should create a flow like :

Queue read file. When finished, Queue ParseHtml. When finished, Queue both send to WS and write locally. When finished, log the result.

However, I don't know how to realize such think.

I feel it will ends in a set of consumer/producer queues, but I didn't find a correct sample.

And moreover, I'm not sure if there will be benefits.

thanks for advices

[Edit] In fact, I'm the perfect candidate for using c# 4.5... if only it was rtm :)

[Edit 2] Another thing making me thinking it's not correctly parallelized, is that in the resource monitor, I see graphs of CPU, network I/O and disk I/O not stable. when one is high, others are low to medium


Solution

  • You're not leveraging any async I/O APIs in any of your code. Everything you're doing is CPU bound and all your I/O operations are going to waste CPU resources blocking. AsParallel is for compute bound tasks, if you want to take advantage of async I/O you need to leverage the Asynchronous Programming Model (APM) based APIs today in <= v4.0. This is done by looking for BeginXXX/EndXXX methods on the I/O based classes you're using and leveraging those whenever available.

    Read this post for starters: TPL TaskFactory.FromAsync vs Tasks with blocking methods

    Next, you don't want to use AsParallel in this case anyway. AsParallel enables streaming which will result in an immediately scheduling a new Task per item, but you don't need/want that here. You'd be much better served by partitioning the work using Parallel::ForEach.

    Let's see how you can use this knowledge to achieve max concurrency in your specific case:

    var refs = GetReferencesFromDB();
    
    // Using Parallel::ForEach here will partition and process your data on separate worker threads
    Parallel.ForEach(
        refs,
        ref =>
    { 
        string filePath = GetFilePath(ref);
    
        byte[] fileDataBuffer = new byte[1048576];
    
        // Need to use FileStream API directly so we can enable async I/O
        FileStream sourceFileStream = new FileStream(
                                          filePath, 
                                          FileMode.Open,
                                          FileAccess.Read,
                                          FileShare.Read,
                                          8192,
                                          true);
    
        // Use FromAsync to read the data from the file
        Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                                 sourceFileStream.BeginRead
                                                 sourceFileStream.EndRead
                                                 fileDataBuffer,
                                                 fileDataBuffer.Length,
                                                 null);
    
        // Add a continuation that will fire when the async read is completed
        readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
        {
            int soureFileStreamBytesRead;
    
            try
            {
                // Determine exactly how many bytes were read 
                // NOTE: this will propagate any potential exception that may have occurred in EndRead
                sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
            }
            finally
            {
                // Always clean up the source stream
                sourceFileStream.Close();
                sourceFileStream = null;
            }
    
            // This is here to make sure you don't end up trying to read files larger than this sample code can handle
            if(sourceFileStreamBytesRead == fileDataBuffer.Length)
            {
                throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
            }
    
            // Convert the file data to a string
            string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);
    
            // Parse the HTML
            string convertedHtml = ParseHtml(html);
    
            // This is here to make sure you don't end up trying to write files larger than this sample code can handle
            if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
            {
                throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
            }
    
            // Convert the file data back to bytes for writing
            Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);
    
            // Need to use FileStream API directly so we can enable async I/O
            FileStream destinationFileStream = new FileStream(
                                                   destinationFilePath,
                                                   FileMode.OpenOrCreate,
                                                   FileAccess.Write,
                                                   FileShare.None,
                                                   8192,
                                                   true);
    
            // Use FromAsync to read the data from the file
            Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                      destinationFileStream.BeginWrite,
                                                      destinationFileStream.EndWrite,
                                                      fileDataBuffer,
                                                      0,
                                                      fileDataBuffer.Length,
                                                      null);
    
            // Add a continuation that will fire when the async write is completed
            destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
            {
                try
                {
                    // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                    destinationFileStreamWriteAntecedent.Wait();
                }
                finally
                {
                    // Always close the destination file stream
                    destinationFileStream.Close();
                    destinationFileStream = null;
                }
            },
            TaskContinuationOptions.AttachedToParent);
    
            // Send to external system **concurrent** to writing to destination file system above
            SendToWs(ref, convertedHtml);
        },
        TaskContinuationOptions.AttachedToParent);
    });
    

    Now, here's few notes:

    1. This is sample code so I'm using a 1MB buffer to read/write files. This is excessive for HTML files and wasteful of system resources. You can either lower it to suit your max needs or implement chained reads/writes into a StringBuilder which is an excercise I leave up to you since I'd be writing ~500 more lines of code to do async chained reads/writes. :P
    2. You'll note that on the continuations for the read/write tasks I have TaskContinuationOptions.AttachedToParent. This is very important as it will prevent the worker thread that the Parallel::ForEach starts the work with from completing until all the underlying async calls have completed. If this was not here you would kick off work for all 5000 items concurrently which would pollute the TPL subsystem with thousands of scheduled Tasks and not scale properly at all.
    3. I call SendToWs concurrent to writing the file to the file share here. I don't know what is underlying the implementation of SendToWs, but it too sounds like a good candidate for making async. Right now it's assumed it's pure compute work and, as such, is going to burn a CPU thread while executing. I leave it as an excercise to you to figure out how best to leverage what I've shown you to improve throughput there.
    4. This is all typed free form and my brain was the only compiler here and SO's syntax higlighting is all I used to make sure syntax was good. So, please forgive any syntax errors and let me know if I screwed up anything too badly that you can't make heads or tails of it and I'll follow up.