Search code examples
.nettask-parallel-librarytpl-dataflow

Task Fails to Run


I wrote a small utility to read large text files and search for lines that contain a search term. I'm using this as an opportunity to learn TPL Dataflow.

The code works fine, unless the search term is near the very end of the file. In that case, the uiResult action block does not get called unless there is a breakpoint in it.

My understanding is that data is posted to uiResult from searcher, after which searcher becomes complete (it has processed its last block of data). Since data has been posted to uiResult it should not become complete until after that data is processed.

Question

Why is it that uiResult becomes complete even though data is posted to it (unless a breakpoint was set in uiResult)?

Code

Here's the relevant code, as trimmed back as possible:

ActionBlock<LineInfo> uiResult = new ActionBlock<LineInfo>(li =>
    {
        // If match found near end of file, the following line only runs
        // if a breakpoint is set on it:
        if (results != null) results.Add(li);
    },
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1,
        CancellationToken = cancelSource.Token,
        TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
    });

BatchBlock<LineInfo> batcher = new BatchBlock<LineInfo>(5000); 

ActionBlock<LineInfo[]> searcher = new ActionBlock<LineInfo[]>(lines =>
    {
        foreach (LineInfo li in lines)
        {
            if (li.TextOfLine.Contains(searchTerm))
            {
                uiResult.Post(li);
            }
        }
    },
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1,
        CancellationToken = cancelSource.Token
    });

batcher.LinkTo(searcher);

batcher.Completion.ContinueWith(t =>
{
    if (t.IsFaulted) ((IDataflowBlock)searcher).Fault(t.Exception);
    else searcher.Complete();

    if (t.IsFaulted) ((IDataflowBlock)uiResult).Fault(t.Exception);
    else uiResult.Complete();
});

Task.Run(() =>
    {
        using (FileStream fs = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
        using (BufferedStream bs = new BufferedStream(fs))
        using (StreamReader sr = new StreamReader(bs))
        {
            string line;
            while ((line = sr.ReadLine()) != null && cancelSource.IsCancellationRequested == false)
            {
                batcher.Post(new LineInfo() { LineNumber = lineNumber, OffsetOfLine = offset, TextOfLine = line });
            }

            batcher.Complete();
            try
            {
                searcher.Completion.Wait();
                uiResult.Completion.Wait();
            }
            catch (AggregateException ae)
            {
                TaskCanceledException taskCancelled = ae.InnerException as TaskCanceledException;
                if (taskCancelled != null)
                {
                    // Swallow the Exception if is just a user cancellation
                    throw;
                }
            }
            finally
            {
                signalDone();
            }
        }
    });

Solution

  • Your code is non-deterministic because of the way you're handling completion. A possible sequence of events is this:

    1. The Task processes the whole file and calls Complete() on batcher.
    2. batcher processes the last batch, sends it to searcher and completes.
    3. The continuation is executed, which calls Complete() on both searcher and uiResult.
    4. Since uiResult has no work to do, it completes.
    5. searcher processes the last batch, trying to send each result to uiResult. But uiResult is already completed, so it rejects everything. This means Post() returns false, but you're not checking for that.

    So the problem is that you're trying to send something to a block that already completed, which doesn't work.

    The solution is to call Complete() on a block only after the block before it completes (i.e. its Completion completes). Probably the easiest way to do that is to use PropagateCompletion with LinkTo().