Search code examples
c#tpl-dataflow

TPL Dataflow: ActionBlock that avoids repeatedly running a using-block (such as for writing to a StreamWriter) on every invocation of its delegate


I need to read 1M rows from an IDataReader, and write n text files simultaneously. Each of those files will be a different subset of the available columns; all n text files will be 1M lines long when complete.

Current plan is one TransformManyBlock to iterate the IDataReader, linked to a BroadcastBlock, linked to n BufferBlock/ActionBlock pairs.

What I'm trying to avoid is having my ActionBlock delegate perform a using (StreamWriter x...) { x.WriteLine(); } that would open and close every output file a million times over.

My current thought is in lieu of ActionBlock, write a custom class that implements ITargetBlock<>. Is there is a simpler approach?

EDIT 1: The discussion is of value for my current problem, but the answers so far got hyper focused on file system behavior. For the benefit of future searchers, the thrust of the question was how to build some kind of setup/teardown outside the ActionBlock delegate. This would apply to any kind of disposable that you would ordinarily wrap in a using-block.

EDIT 2: Per @Panagiotis Kanavos the executive summary of the solution is to setup the object before defining the block, then teardown the object in the block's Completion.ContinueWith.


Solution

  • Writing to a file one line at a time is expensive in itself even when you don't have to open the stream each time. Keeping a file stream open has other issues too, as file streams are always buffered, from the FileStream level all the way down to the file system driver, for performance reasons. You'd have to flush the stream periodically to ensure the data was written to disk.

    To really improve performance you'd have to batch the records, eg with a BatchBlock. Once you do that, the cost of opening the stream becomes negligible.

    The lines should be generated at the last possible moment too, to avoid generating temporary strings that will need to be garbage collected. At n*1M records, the memory and CPU overhead of those allocations and garbage collections would be severe.

    Logging libraries batch log entries before writing to avoid this performance hit.

    You can try something like this :

    var batchBlock=new BatchBlock<Record>(1000);
    var writerBlock=new ActionBlock<Record[]>( records => {
       
        //Create or open a file for appending
        using var writer=new StreamWriter(ThePath,true);
        foreach(var record in records)
        {
            writer.WriteLine("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
        }
    
    });
    
    batchBlock.LinkTo(writerBlock,options);
    

    or, using asynchronous methods

    var batchBlock=new BatchBlock<Record>(1000);
    var writerBlock=new ActionBlock<Record[]>(async records => {
       
        //Create or open a file for appending
        await using var writer=new StreamWriter(ThePath,true);
        foreach(var record in records)
        {
            await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
        }
    
    });
    
    batchBlock.LinkTo(writerBlock,options);
    

    You can adjust the batch size and the StreamWriter's buffer size for optimum performance.

    Creating an actual "Block" that writes to a stream

    A custom block can be created using the technique shown in the Custom Dataflow block walkthrough - instead of creating an actual custom block, create something that returns whatever is needed for LinkTo to work, in this case an ITargetBlock< T> :

    ITargetBlock<Record> FileExporter(string path)
    {
        var writer=new StreamWriter(path,true);
        var block=new ActionBlock<Record>(async msg=>{
            await writer.WriteLineAsync("{0} = {1} :{2}",record.Prop1, record.Prop5, record.Prop2);
        });
    
        //Close the stream when the block completes
        block.Completion.ContinueWith(_=>write.Close());
        return (ITargetBlock<Record>)target;
    }
    ...
    
    
    var exporter1=CreateFileExporter(path1);
    previous.LinkTo(exporter,options);
    

    The "trick" here is that the stream is created outside the block and remains active until the block completes. It's not garbage-collected because it's used by other code. When the block completes, we need to explicitly close it, no matter what happened. block.Completion.ContinueWith(_=>write.Close()); will close the stream whether the block completed gracefully or not.

    This is the same code used in the Walkthrough, to close the output BufferBlock :

    target.Completion.ContinueWith(delegate
    {
       if (queue.Count > 0 && queue.Count < windowSize)
          source.Post(queue.ToArray());
       source.Complete();
    });
    

    Streams are buffered by default, so calling WriteLine doesn't mean the data will actually be written to disk. This means we don't know when the data will actually be written to the file. If the application crashes, some data may be lost.

    Memory, IO and overheads

    When working with 1M rows over a significant period of time, things add up. One could use eg File.AppendAllLinesAsync to write batches of lines at once, but that would result in the allocation of 1M temporary strings. At each iteration, the runtime would have to use at least as RAM for those temporary strings as the batch. RAM usage would start ballooning to hundreds of MBs, then GBs, before the GC fired freezing the threads.

    With 1M rows and lots of data it's hard to debug and track data in the pipeline. If something goes wrong, things can crash very quickly. Imagine for example 1M messages stuck in one block because one message got blocked.

    It's important (for sanity and performance reasons) to keep individual components in the pipeline as simple as possible.