I'm using DataFlow library in .NET 4.7.1 For some reason, my program never terminates when I have await Console.Out.WriteLineAsync(DateTime.Now.TimeOfDay.ToString());
within async
lambda of ActionBlock
constructor. It would just output stream of lines ofDateTime.Now.TimeOfDay.ToString()
and randomly stop, never reaching Console.WriteLine("Time elapsed:" + watch.Elapsed);
, though in some cases I have observed that console outputs "Finished Reading the file"
.
class Program
{
public static async Task Main(string[] args)
{
int numberOfLines = 0;
Console.WriteLine("Number of cores used:" + Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.75) * 2.0)));
BufferBlock<string> queBuffer = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = 100000 });
var processingBlock = new ActionBlock<string>(async inputline =>
{
Interlocked.Increment(ref numberOfLines);
//Line that causes issue
//await Console.Out.WriteLineAsync(DateTime.Now.TimeOfDay.ToString());
}
, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 48,
SingleProducerConstrained = true,
BoundedCapacity = 500
});
queBuffer.LinkTo(processingBlock);
//Start
var watch = System.Diagnostics.Stopwatch.StartNew();
Console.WriteLine("Processing started at:" + DateTime.Now);
if (File.Exists(args[0]))
{
using (StreamReader sr = new StreamReader(args[0]))
{
string line;
// Read and display lines from the file until the end of the file is reached.
while ((line = await sr.ReadLineAsync()) != null)
{
await queBuffer.SendAsync(line);
}
}
await Console.Out.WriteLineAsync("Finished Reading the file");
}
queBuffer.Complete();
processingBlock.Complete();
await Task.WhenAll(queBuffer.Completion, processingBlock.Completion);
watch.Stop();
Console.WriteLine("Time elapsed:" + watch.Elapsed);
Console.WriteLine("Number of lines read:" + numberOfLines.ToString());
}
}
However, if I take out line that causes the issue, it works and reads all the lines from the text file.
W:\test>.\CompressAMS.exe token2-small.txt
Number of cores used:24
Processing started at:12/17/2018 6:32:50 PM
Finished Reading the file
Time elapsed:00:00:00.3569824
Number of lines read:100000
What you do have is a race condition when you complete. Your're calling Complete()
on both blocks forcing the processing block to stop receiving messages and at that point the buffer may still have data to pass. Then when you await both blocks to complete, if the buffer hasn't sent all of it's messages it will never complete and execution will hang at Finished Reading File
.
You can safely await both blocks but only call Complete()
on the buffer and allow TDF to propagate the completion to your downstream processing block:
queBuffer.LinkTo(processingBlock, new DataflowLinkOptions() { PropagateCompletion = true });
/******/
queBuffer.Complete();
await Task.WhenAll(queBuffer.Completion, processingBlock.Completion);