Search code examples
c#tpl-dataflow

TPL Dataflow process each file synchronously but each line within a file asynchronously


So my use case requires me to process a list of files, where for every file in the list I go through each line and do some calculations on those line. Now my problem is that I cannot have multiple files' lines in my buffer block, so I basically need to make sure that one file is completely processed (through a series of dataflow blocks), before I even enter the second file.

Now I looked at TPL DataFlow One by one processing where the answer says either to stop using tpl dataflow altogether or to encapsulate multiple processing blocks into one so I can control it. But if I do that I would lose the "composability" that tpl provides, it also seems a bit wasteful to lump in independent blocks together. Is there some other way to do this?

I thought of using the OutputAvailableAsync at the leaf node to notify me when everything has been flushed out before I post in another file. But I couldn't get OutputAvailableAsync to work at all. It just waits forever.

EDIT

Down the pipeline, I would have an actionblock with state, for which I am planning to use a ConcurrentDictionary (For each line in a file I have multiple things of note). Now I cannot possibly index each line because that would mean I would have to keep the state for N number of files being processed together. Here N would probably be the # of files to be processed.

This is what I have for now, bear in mind I have just coded out a proof of concept.

        static public IPropagatorBlock<string, string[]> CreatePipeline(int batchSize)
    {

        var fileReadingBlock = new TransformManyBlock<string, string>((filePath) =>
        {
            return File.ReadLines(filePath);
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        var fileParsingBlock = new TransformBlock<string, string[]>((line) =>
        {
            return line.Split(",");
        }, new ExecutionDataflowBlockOptions { EnsureOrdered = true, MaxDegreeOfParallelism = Environment.ProcessorCount});

        return DataflowBlock.Encapsulate(fileReadingBlock, fileParsingBlock);

    }

Solution

  • You could take advantage of the conditional linking capabilities of the TPL Dataflow, in order to create a pipeline that is partially shared and partially dedicated. A single reader block and a single parser block would be shared by all files, while a dedicated processor block will be created for each file. Here is a simple demonstration of the concept:

    var parser = new TransformBlock<(int Id, string Line), (int Id, string[])>(line =>
    {
        return (line.Id, line.Line?.Split(","));
    });
    
    var reader = new TransformManyBlock<(int Id, string Path), (int, string)>(file =>
    {
        var processor = CreateProcessor(file.Id);
    
        // Create a conditional link from the parser block to the processor block
        var link = parser.LinkTo(processor, entry => entry.Id == file.Id);
    
        return File
            .ReadLines(file.Path)
            .Select(line => (file.Id, line))
            .Append((file.Id, null)); // Completion signal
    });
    
    ActionBlock<(int Id, string[] LineParts)> CreateProcessor(int fileId)
    {
        var streamWriter = new StreamWriter($@"C:\{fileId}.out");
    
        return new ActionBlock<(int Id, string[] LineParts)>(line =>
        {
            if (line.LineParts == null)
            {
                streamWriter.Close(); // Completion signal received
                return;
            }
            streamWriter.WriteLine(String.Join("|", line.LineParts));
        });
    }
    
    reader.LinkTo(parser);
    

    In this example each file is associated with an int Id. This Id is passed along with each line, in order to be able to reconstruct the file downstream. Value tuples are used to combine each piece of data with the Id of its originated file. A conditional link is created between the shared parser block and each dedicated processor block. A null payload is used as an end-of-file indicator. Upon receiving this signal a processor block should ideally unlink itself from the parser, in order to keep the overhead of the conditional linking mechanism to a minimum. The unlinking is performed by disposing the link returned by the LinkTo method. For simplicity's sake this important step has been omitted from the above example.

    I should probably repeat here what I've already wrote in my answer in a previous related question, that passing individual strings from block to block will result to significant overhead. Chunkifying (batching) the workload is the way to go, in order to ensure that the pipeline will perform as smoothly (friction-free) as possible.