Search code examples
c#task-parallel-librarytpl-dataflow

How to search vast code base for multiple literal strings efficiently?


This question is a follow up on How to optimize performance in a simple TPL DataFlow pipeline?

The source code is here - https://github.com/MarkKharitonov/LearningTPLDataFlow

Given:

  • Several solutions covering about 400 C# projects encompassing thousands of C# source files totaling in more than 10,000,000 lines of code.
  • A file containing string literals, one per line.

I want to produce a JSON file listing all the occurrences of the literals in the source code. For every matching line I want to have the following pieces of information:

  • The project path
  • The C# file path
  • The matching line itself
  • The matching line number

And all the records arranged as a dictionary keyed by the respective literal.

So the challenge is to do it as efficiently as possible (in C#, of course).

The DataFlow pipeline can be found in this file - https://github.com/MarkKharitonov/LearningTPLDataFlow/blob/master/FindStringCmd.cs

Here it is:

private void Run(string workspaceRoot, string outFilePath, string[] literals, bool searchAllFiles, int workSize, int maxDOP1, int maxDOP2, int maxDOP3, int maxDOP4)
{
    var res = new SortedDictionary<string, List<MatchingLine>>();
    var projects = (workspaceRoot + "build\\projects.yml").YieldAllProjects();
    var progress = new Progress();

    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, Environment.ProcessorCount);

    var produceCSFiles = new TransformManyBlock<ProjectEx, CSFile>(p => YieldCSFiles(p, searchAllFiles), new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP1
    });
    var produceCSFileContent = new TransformBlock<CSFile, CSFile>(CSFile.PopulateContentAsync, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP2
    });
    var produceWorkItems = new TransformManyBlock<CSFile, (CSFile CSFile, int Pos, int Length)>(csFile => csFile.YieldWorkItems(literals, workSize, progress), new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP3,
        TaskScheduler = taskSchedulerPair.ConcurrentScheduler
    });
    var produceMatchingLines = new TransformManyBlock<(CSFile CSFile, int Pos, int Length), MatchingLine>(o => o.CSFile.YieldMatchingLines(literals, o.Pos, o.Length, progress), new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDOP4,
        TaskScheduler = taskSchedulerPair.ConcurrentScheduler
    });
    var getMatchingLines = new ActionBlock<MatchingLine>(o => AddResult(res, o));

    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

    produceCSFiles.LinkTo(produceCSFileContent, linkOptions);
    produceCSFileContent.LinkTo(produceWorkItems, linkOptions);
    produceWorkItems.LinkTo(produceMatchingLines, linkOptions);
    produceMatchingLines.LinkTo(getMatchingLines, linkOptions);

    var progressTask = Task.Factory.StartNew(() =>
    {
        var delay = literals.Length < 10 ? 1000 : 10000;
        for (; ; )
        {
            var current = Interlocked.Read(ref progress.Current);
            var total = Interlocked.Read(ref progress.Total);
            Console.Write("Total = {0:n0}, Current = {1:n0}, Percents = {2:P}   \r", total, current, ((double)current) / total);
            if (progress.Done)
            {
                break;
            }
            Thread.Sleep(delay);
        }
        Console.WriteLine();
    }, TaskCreationOptions.LongRunning);

    projects.ForEach(p => produceCSFiles.Post(p));
    produceCSFiles.Complete();
    getMatchingLines.Completion.GetAwaiter().GetResult();
    progress.Done = true;
    progressTask.GetAwaiter().GetResult();

    res.SaveAsJson(outFilePath);
}

The default parameters are (https://github.com/MarkKharitonov/LearningTPLDataFlow/blob/master/FindStringCmd.cs#L24-L28):

private int m_maxDOP1 = 3;
private int m_maxDOP2 = 20;
private int m_maxDOP3 = Environment.ProcessorCount;
private int m_maxDOP4 = Environment.ProcessorCount;
private int m_workSize = 1_000_000;

My idea is to divide the work into work items, where a work item size is computed by multiplying the number of lines in the respective file by the count of the string literals. So, if a C# file contains 500 lines, then searching it for all the 3401 literals results in a work of size 3401 * 500 = 1700500

The unit of work is by default 1000000 lines, so in the aforementioned example the file would result in 2 work items:

  1. Literals 0..1999
  2. Literals 2000..3400

And it is the responsibility of the produceWorkItems block to generate these work items from files.

Example runs:

C:\work\TPLDataFlow [master ≡]> .\bin\Debug\net5.0\TPLDataFlow.exe find-string -d C:\xyz\tip -o c:\temp -l C:\temp\2.txt
Locating all the instances of the 4 literals found in the file C:\temp\2.txt in the C# code ...
Total = 49,844,516, Current = 49,702,532, Percents = 99.72%
Elapsed: 00:00:18.4320676
C:\work\TPLDataFlow [master ≡]> .\bin\Debug\net5.0\TPLDataFlow.exe find-string -d C:\xyz\tip -o c:\temp -l C:\temp\1.txt
Locating all the instances of the 3401 literals found in the file c:\temp\1.txt in the C# code ...
Total = 42,379,095,775, Current = 42,164,259,870, Percents = 99.49%
Elapsed: 01:44:13.4289270

Question

Many work items are undersized. If I have 3 C# files, 20 lines each, my current code would produce 3 work items, because in my current implementation work items never cross a file boundary. This is inefficient. Ideally, they would be batched into a single work item, because 60 * 3401 = 204060 < 1000000. But the BatchBlock cannot be used here, because it expects me to provide the batch size, which I do not know - it depends on the work items in the pipeline.

How would you achieve such batching ?


Solution

  • Regarding the first question (configuring the pipeline), I can't really offer any guidance. Optimizing the parameters of a dataflow pipeline seems like a black art to me!

    Regarding the second question (how to batch a work load consisting of work items having unknown size at compile time), you could use the custom BatchBlock<T> below. It uses the DataflowBlock.Encapsulate method in order to combine two dataflow blocks to one. The first block in an ActionBlock<T> that receives the input and puts it into a buffer, and the second is a BufferBlock<T[]> that holds the batched items and propagates them downstream. The weightSelector is a lambda that returns the weight of each received item. When the accumulated weight surpasses the batchWeight threshold, a batch is emitted.

    public static IPropagatorBlock<T, T[]> CreateDynamicBatchBlock<T>(
        int batchWeight, Func<T, int> weightSelector,
        DataflowBlockOptions options = null)
    {
        // Arguments validation omitted
        options ??= new DataflowBlockOptions();
        var outputBlock = new BufferBlock<T[]>(options);
        List<T> buffer = new List<T>();
        int sumWeight = 0;
    
        var inputBlock = new ActionBlock<T>(async item =>
        {
            checked
            {
                int weight = weightSelector(item);
                if (weight + sumWeight > batchWeight && buffer.Count > 0)
                    await SendBatchAsync();
                buffer.Add(item);
                sumWeight += weight;
                if (sumWeight >= batchWeight) await SendBatchAsync();
            }
        }, new()
        {
            BoundedCapacity = options.BoundedCapacity,
            CancellationToken = options.CancellationToken,
            TaskScheduler = options.TaskScheduler,
            MaxMessagesPerTask = options.MaxMessagesPerTask,
            NameFormat = options.NameFormat
        });
    
        PropagateCompletion(inputBlock, outputBlock, async () =>
        {
            if (buffer.Count > 0) await SendBatchAsync();
        });
    
        Task SendBatchAsync()
        {
            var batch = buffer.ToArray();
            buffer.Clear();
            sumWeight = 0;
            return outputBlock.SendAsync(batch);
        }
    
        static async void PropagateCompletion(IDataflowBlock source,
            IDataflowBlock target, Func<Task> postCompletionAction)
        {
            try { await source.Completion.ConfigureAwait(false); } catch { }
            Exception ex =
                source.Completion.IsFaulted ? source.Completion.Exception : null;
            try { await postCompletionAction(); }
            catch (Exception actionError) { ex = actionError; }
            if (ex != null) target.Fault(ex); else target.Complete();
        }
    
        return DataflowBlock.Encapsulate(inputBlock, outputBlock);
    }
    

    Usage example:

    var batchBlock = CreateDynamicBatchBlock<WorkItem>(1_000_000, wi => wi.Size);
    

    If the weight int type has not enough range and overflows, you could switch to long or double.