I wrote a small utility to read large text files and search for lines that contain a search term. I'm using this as an opportunity to learn TPL Dataflow.
The code works fine, unless the search term is near the very end of the file. In that case, the uiResult
action block does not get called unless there is a breakpoint in it.
My understanding is that data is posted to uiResult
from searcher
, after which searcher
becomes complete (it has processed its last block of data). Since data has been posted to uiResult
it should not become complete until after that data is processed.
Question
Why is it that uiResult
becomes complete even though data is posted to it (unless a breakpoint was set in uiResult
)?
Code
Here's the relevant code, as trimmed back as possible:
ActionBlock<LineInfo> uiResult = new ActionBlock<LineInfo>(li =>
{
// If match found near end of file, the following line only runs
// if a breakpoint is set on it:
if (results != null) results.Add(li);
},
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
CancellationToken = cancelSource.Token,
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
});
BatchBlock<LineInfo> batcher = new BatchBlock<LineInfo>(5000);
ActionBlock<LineInfo[]> searcher = new ActionBlock<LineInfo[]>(lines =>
{
foreach (LineInfo li in lines)
{
if (li.TextOfLine.Contains(searchTerm))
{
uiResult.Post(li);
}
}
},
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
CancellationToken = cancelSource.Token
});
batcher.LinkTo(searcher);
batcher.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)searcher).Fault(t.Exception);
else searcher.Complete();
if (t.IsFaulted) ((IDataflowBlock)uiResult).Fault(t.Exception);
else uiResult.Complete();
});
Task.Run(() =>
{
using (FileStream fs = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
using (BufferedStream bs = new BufferedStream(fs))
using (StreamReader sr = new StreamReader(bs))
{
string line;
while ((line = sr.ReadLine()) != null && cancelSource.IsCancellationRequested == false)
{
batcher.Post(new LineInfo() { LineNumber = lineNumber, OffsetOfLine = offset, TextOfLine = line });
}
batcher.Complete();
try
{
searcher.Completion.Wait();
uiResult.Completion.Wait();
}
catch (AggregateException ae)
{
TaskCanceledException taskCancelled = ae.InnerException as TaskCanceledException;
if (taskCancelled != null)
{
// Swallow the Exception if is just a user cancellation
throw;
}
}
finally
{
signalDone();
}
}
});
Your code is non-deterministic because of the way you're handling completion. A possible sequence of events is this:
Task
processes the whole file and calls Complete()
on batcher
.batcher
processes the last batch, sends it to searcher
and completes.Complete()
on both searcher
and uiResult
.uiResult
has no work to do, it completes.searcher
processes the last batch, trying to send each result to uiResult
. But uiResult
is already completed, so it rejects everything. This means Post()
returns false
, but you're not checking for that.So the problem is that you're trying to send something to a block that already completed, which doesn't work.
The solution is to call Complete()
on a block only after the block before it completes (i.e. its Completion
completes). Probably the easiest way to do that is to use PropagateCompletion
with LinkTo()
.