Search code examples
.netmultithreadingstreamthread-safetyproducer-consumer

An approach to multithreaded file processing


I have a quite large file(> 15 GB)(never mind what kind of file). I have to read file, do some processing with data, then write processed data to a blank file. I do it in chunks. Each chunk contains a header of some sort, followed by the data. The simplest file of multiple chunks would contain:

Number of block bytes
Block bytes
Number of block bytes
Block bytes

So, I create one thread for reading file by chunks, some threads for processing each read chunk, and one thread for writing by chunks processed data.

And i have a sort of problem with managing that threads.

I don't know the order in which each chunk will have been processed, though i must to write chunks to file in order like it has been read.

So, my question is what kind of approach i have to use for manage that multithreaded processing.

I guess, It might be better, If i use producer concumer pattern. What a data structure is best to use in that case for storing the data already has been processed? I have one guesses - an stack based on an array, that i need sort once before start writing.

But I'm not sure. So, please help me with an approach.

//sample of my code, but without any logic of threads managing

public class DataBlock
{
    public byte[] Data { get; }
    public long Index { get; }

    public DataBlock(byte[] data, long index)
    {
        this.Data = data;
        this.Index = index;
    }
}


int bufferSize = 1024*64; //65536
long processedBlockCounter = 0L;
MyStack<DataBlock> processedBlockStore = new MyStack<DataBlock>();

using (FileStream fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize))
{
    using (BufferedStream bs = new BufferedStream(fs, bufferSize))
    {
        byte[] buffer = new byte[bufferSize];
        int byteRead;
        while ((byteRead = bs.Read(buffer, 0, bufferSize)) > 0)
        {
            byte[] originalBytes;
            using (MemoryStream mStream = new MemoryStream())
            {
                mStream.Write(buffer, 0, byteRead);
                originalBytes = mStream.ToArray();
            }

            long dataBlockIndex = Interlocked.Increment(ref processedBlockCounter);

            Thread processThread = new Thread(() =>
            {
                byte[] processedBytes = MyProcessor.Process(originalBytes);
                DataBlock processedBlock = new DataBlock(processedBytes, processedBlockCounter);
                lock(processedBlockStore)
                {
                     processedBlockStore.Add(processedBlock);
                }
            });
            processThread.Start();
        }
    }
}

Solution

  • You're creating new thread for each iteration. That isn't going to scale. I'd recommend you to use ThreadPool instead. Preferred way is to use TPL which internally uses ThreadPool.

    Since you need ordering and parallel processing and they doesn't go hand in hand, you can either make your code completely synchronous if that's an option.

    If you need to process in parallel I'd recommend the following Fork-Join strategy given that your file is larger than 15 GB and your processing is time consuming too.

    • Chunkify your file
    • Start a Task with each chunk
    • Make each task write the output to a temporary file named index. 1.txt, 2.txt etc
    • Wait for all Tasks to complete
    • Finally read those temporary files and create your output file in order.
    • Then of course delete those temporary files. You're done.