Just wondering about the MaxDegreeOfParallelism
setting in the dataflow library. I was writing a unit test to ensure that the number of parallel tasks didn't exceed the specified amount, when I found that it is indeed sometimes exceeded. My question is: am I doing something wrong to measure it, or is the setting only a "best attempt" to limit the parallelism?
using System.Threading.Tasks.Dataflow;
using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, _) => cts.Cancel();
var cancellationToken = cts.Token;
var messageCount = 10_000;
var maxDegreeOfParallelism = 100;
var concurrentlyConsuming = new bool[messageCount];
using var semaphore = new SemaphoreSlim(1, 1);
const int maxDelayMs = 10;
var consumeMessageBlock = new TransformBlock<int, int>(async x =>
{
concurrentlyConsuming[x] = true;
await semaphore.WaitAsync(cancellationToken);
int actualParallelism;
try
{
actualParallelism = concurrentlyConsuming.Count(isRunning => isRunning);
}
finally
{
semaphore.Release();
}
await Task.Delay(Random.Shared.Next(0, maxDelayMs), cancellationToken);
concurrentlyConsuming[x] = false;
return actualParallelism;
}, new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationToken,
MaxDegreeOfParallelism = maxDegreeOfParallelism,
});
var output = new List<int>();
var collectOutput = new ActionBlock<int>(output.Add);
consumeMessageBlock.LinkTo(collectOutput, new DataflowLinkOptions { PropagateCompletion = true });
for (var i = 0; i < messageCount; i++) await consumeMessageBlock.SendAsync(i, cancellationToken);
consumeMessageBlock.Complete();
await collectOutput.Completion;
Console.WriteLine(string.Join(",", output.Where(actualParallelism => actualParallelism > maxDegreeOfParallelism)));
If you run the example (version 6.0.0 of System.Threading.Tasks.Dataflow) you'll see output such as:
103,101,101
The MaxDegreeOfParallelism
setting in TPL Dataflow is a hard limit. It's not best effort. Your attempt to measure the parallelism experimentally with the concurrentlyConsuming
array of booleans has undefined behavior, because it involves accessing shared state without synchronization. Synchronizing only the reads and not the writes of shared state, is as effective as not synchronizing anything at all.
An easy way to measure the concurrency is to increase a counter when the processing of a message is starting, and decrease it when the processing is completed. Something like this for example:
int concurrencyCounter = 0;
TransformBlock<int, int> consumeMessageBlock = new(async x =>
{
int concurrency = Interlocked.Increment(ref concurrencyCounter);
try
{
await Task.Delay(Random.Shared.Next(0, 100));
return concurrency;
}
finally { Interlocked.Decrement(ref concurrencyCounter); }
}, new() { MaxDegreeOfParallelism = 100 });