This question is a follow up on How to optimize performance in a simple TPL DataFlow pipeline?
The source code is here - https://github.com/MarkKharitonov/LearningTPLDataFlow
Given:
I want to produce a JSON file listing all the occurrences of the literals in the source code. For every matching line I want to have the following pieces of information:
And all the records arranged as a dictionary keyed by the respective literal.
So the challenge is to do it as efficiently as possible (in C#, of course).
The DataFlow pipeline can be found in this file - https://github.com/MarkKharitonov/LearningTPLDataFlow/blob/master/FindStringCmd.cs
Here it is:
private void Run(string workspaceRoot, string outFilePath, string[] literals, bool searchAllFiles, int workSize, int maxDOP1, int maxDOP2, int maxDOP3, int maxDOP4)
{
var res = new SortedDictionary<string, List<MatchingLine>>();
var projects = (workspaceRoot + "build\\projects.yml").YieldAllProjects();
var progress = new Progress();
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, Environment.ProcessorCount);
var produceCSFiles = new TransformManyBlock<ProjectEx, CSFile>(p => YieldCSFiles(p, searchAllFiles), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDOP1
});
var produceCSFileContent = new TransformBlock<CSFile, CSFile>(CSFile.PopulateContentAsync, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDOP2
});
var produceWorkItems = new TransformManyBlock<CSFile, (CSFile CSFile, int Pos, int Length)>(csFile => csFile.YieldWorkItems(literals, workSize, progress), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDOP3,
TaskScheduler = taskSchedulerPair.ConcurrentScheduler
});
var produceMatchingLines = new TransformManyBlock<(CSFile CSFile, int Pos, int Length), MatchingLine>(o => o.CSFile.YieldMatchingLines(literals, o.Pos, o.Length, progress), new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDOP4,
TaskScheduler = taskSchedulerPair.ConcurrentScheduler
});
var getMatchingLines = new ActionBlock<MatchingLine>(o => AddResult(res, o));
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
produceCSFiles.LinkTo(produceCSFileContent, linkOptions);
produceCSFileContent.LinkTo(produceWorkItems, linkOptions);
produceWorkItems.LinkTo(produceMatchingLines, linkOptions);
produceMatchingLines.LinkTo(getMatchingLines, linkOptions);
var progressTask = Task.Factory.StartNew(() =>
{
var delay = literals.Length < 10 ? 1000 : 10000;
for (; ; )
{
var current = Interlocked.Read(ref progress.Current);
var total = Interlocked.Read(ref progress.Total);
Console.Write("Total = {0:n0}, Current = {1:n0}, Percents = {2:P} \r", total, current, ((double)current) / total);
if (progress.Done)
{
break;
}
Thread.Sleep(delay);
}
Console.WriteLine();
}, TaskCreationOptions.LongRunning);
projects.ForEach(p => produceCSFiles.Post(p));
produceCSFiles.Complete();
getMatchingLines.Completion.GetAwaiter().GetResult();
progress.Done = true;
progressTask.GetAwaiter().GetResult();
res.SaveAsJson(outFilePath);
}
The default parameters are (https://github.com/MarkKharitonov/LearningTPLDataFlow/blob/master/FindStringCmd.cs#L24-L28):
private int m_maxDOP1 = 3;
private int m_maxDOP2 = 20;
private int m_maxDOP3 = Environment.ProcessorCount;
private int m_maxDOP4 = Environment.ProcessorCount;
private int m_workSize = 1_000_000;
My idea is to divide the work into work items, where a work item size is computed by multiplying the number of lines in the respective file by the count of the string literals. So, if a C# file contains 500 lines, then searching it for all the 3401 literals results in a work of size 3401 * 500 = 1700500
The unit of work is by default 1000000 lines, so in the aforementioned example the file would result in 2 work items:
And it is the responsibility of the produceWorkItems
block to generate these work items from files.
Example runs:
C:\work\TPLDataFlow [master ≡]> .\bin\Debug\net5.0\TPLDataFlow.exe find-string -d C:\xyz\tip -o c:\temp -l C:\temp\2.txt
Locating all the instances of the 4 literals found in the file C:\temp\2.txt in the C# code ...
Total = 49,844,516, Current = 49,702,532, Percents = 99.72%
Elapsed: 00:00:18.4320676
C:\work\TPLDataFlow [master ≡]> .\bin\Debug\net5.0\TPLDataFlow.exe find-string -d C:\xyz\tip -o c:\temp -l C:\temp\1.txt
Locating all the instances of the 3401 literals found in the file c:\temp\1.txt in the C# code ...
Total = 42,379,095,775, Current = 42,164,259,870, Percents = 99.49%
Elapsed: 01:44:13.4289270
Question
Many work items are undersized. If I have 3 C# files, 20 lines each, my current code would produce 3 work items, because in my current implementation work items never cross a file boundary. This is inefficient. Ideally, they would be batched into a single work item, because 60 * 3401 = 204060 < 1000000. But the BatchBlock
cannot be used here, because it expects me to provide the batch size, which I do not know - it depends on the work items in the pipeline.
How would you achieve such batching ?
Regarding the first question (configuring the pipeline), I can't really offer any guidance. Optimizing the parameters of a dataflow pipeline seems like a black art to me!
Regarding the second question (how to batch a work load consisting of work items having unknown size at compile time), you could use the custom BatchBlock<T>
below. It uses the DataflowBlock.Encapsulate
method in order to combine two dataflow blocks to one. The first block in an ActionBlock<T>
that receives the input and puts it into a buffer, and the second is a BufferBlock<T[]>
that holds the batched items and propagates them downstream. The weightSelector
is a lambda that returns the weight of each received item. When the accumulated weight surpasses the batchWeight
threshold, a batch is emitted.
public static IPropagatorBlock<T, T[]> CreateDynamicBatchBlock<T>(
int batchWeight, Func<T, int> weightSelector,
DataflowBlockOptions options = null)
{
// Arguments validation omitted
options ??= new DataflowBlockOptions();
var outputBlock = new BufferBlock<T[]>(options);
List<T> buffer = new List<T>();
int sumWeight = 0;
var inputBlock = new ActionBlock<T>(async item =>
{
checked
{
int weight = weightSelector(item);
if (weight + sumWeight > batchWeight && buffer.Count > 0)
await SendBatchAsync();
buffer.Add(item);
sumWeight += weight;
if (sumWeight >= batchWeight) await SendBatchAsync();
}
}, new()
{
BoundedCapacity = options.BoundedCapacity,
CancellationToken = options.CancellationToken,
TaskScheduler = options.TaskScheduler,
MaxMessagesPerTask = options.MaxMessagesPerTask,
NameFormat = options.NameFormat
});
PropagateCompletion(inputBlock, outputBlock, async () =>
{
if (buffer.Count > 0) await SendBatchAsync();
});
Task SendBatchAsync()
{
var batch = buffer.ToArray();
buffer.Clear();
sumWeight = 0;
return outputBlock.SendAsync(batch);
}
static async void PropagateCompletion(IDataflowBlock source,
IDataflowBlock target, Func<Task> postCompletionAction)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
Exception ex =
source.Completion.IsFaulted ? source.Completion.Exception : null;
try { await postCompletionAction(); }
catch (Exception actionError) { ex = actionError; }
if (ex != null) target.Fault(ex); else target.Complete();
}
return DataflowBlock.Encapsulate(inputBlock, outputBlock);
}
Usage example:
var batchBlock = CreateDynamicBatchBlock<WorkItem>(1_000_000, wi => wi.Size);
If the weight int
type has not enough range and overflows, you could switch to long
or double
.