Search code examples
c#task-parallel-librarytpl-dataflow

ActionBlock B never receives item returned by TransformBlock A


I am having a problem with a TPL Dataflow mesh in my C#/WPF app. The first input item, (called a "Job"), always goes all the way through the chain to the final TPL block. But the remaining jobs never arrive at the final block (#4), even though log statements clearly show them successfully being returned from block #3

Here is the mesh. Set up once and stored in a private member of my View-Model class.

// 1. _meshStartBlock:  On UI thread.   This block always works fine.

_meshStartBlock = new TransformBlock<Job, Job>(job =>
{
    Jobs.Add(job);
    Fire(_scanCapturedTrigger, job);  // Notify sstate machine.
    Log.Debug("Started: " + job.Name);
    return job;
},
new ExecutionDataflowBlockOptions
{
    CancellationToken = TokenSource.Token,
    TaskScheduler = UiTaskScheduler   // Run on UI thread (because it edits
                                      // our ObservableCollection)
});

// 2. createBlock:  This block also always works fine.

var createBlock = new TransformBlock<Job, Job>(job =>
{
    job.CreateScan();          // Saves some disk files
    job.CreateThumbnail(true); // Creates and saves a thumbnail image.
    Log.Debug("Created: " + job.Name);
    return job;
},
new ExecutionDataflowBlockOptions
{ CancellationToken = TokenSource.Token, MaxDegreeOfParallelism = 1 });


// 3. processBlock - do heavy work in parallel
// This block succeeds for all 3 jobs but 2nd and 3rd returned jobs never
// reach the next block.

var processBlock = new TransformBlock<Job, Job>(job =>
{
    try
    {
        Log.Debug("Processing: " + job.Name);
        job.AlignImages();            // heavy image processing
        job.Generate3d();             // heavy 3d math
        job.FindShapes();             // more heavy match
        job.GetContext().Scan.Save(); // save disk files
        Log.Debug("Processing succeeded: " + job.Name
    }
    catch (Exception e)
    {
        Log.Error("Processing failed: " + job.Name);
    }

    // *** THIS LOG STATEMENT SHOWS UP FOR ALL 3 JOBS ***

    Log.Debug("Leaving process block: " + job.Name);

    return job;
},
new ExecutionDataflowBlockOptions
{ CancellationToken = TokenSource.Token, MaxDegreeOfParallelism = 3 });


// 4. doneBlock: Cleans up.
// Since we schedule this on the UI thread it should not be heavy.

var doneBlock = new ActionBlock<Job>(job =>
{
    // *** ONLY REACHED BY JOB 1 ***  

    Log.Debug("Done: " + job.Name);
    Fire(Trigger.ScanProcessed);    // Notify State Machine
},
new ExecutionDataflowBlockOptions
{ CancellationToken = TokenSource.Token,  TaskScheduler = UiTaskScheduler });

// Set up the mesh.  Link the blocks together to form a chain.

_meshStartBlock.LinkTo(createBlock);
createBlock.LinkTo(processBlock);
processBlock.LinkTo(doneBlock);

return _meshStartBlock;

This is the log output I get

Started: Job1
Created: Job1
Started: Job2
Processing: Job1
Created: Job2
Processing: Job2
Started: Job3
Created: Job3
Processing: Job3
Processing succeeded: Job1
Leaving process block: Job1
Done: Job1
Processing succeeded: Job2
Leaving process block: Job2
Processing succeeded: Job3
Leaving process block: Job3

The Debug window does not report any exceptions during processing or dump error messages of any kind.

I should note that I am forced to run this in Release build. If I run a Debug build then that process block takes hours. Also the CancellationToken is never invoked

Can any TPL-Dataflow gurus tell me how I can diagnose what might be happening to Job2 and Job3? Is there anyway I can get TPL Dataflow to tell me what happened my Jobs?


Solution

  • It may help to attach error handlers to the blocks, to log the exceptions as soon as they happen. Here is an example of a simple generic error handler:

    public static async void OnErrorLog(IDataflowBlock block)
    {
        try
        {
            await block.Completion.ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            Log.Error($"{block.GetType().Name} failed", ex);
        }
    }
    

    You can adapt it to your likings.

    Usage example:

    OnErrorLog(_meshStartBlock);
    OnErrorLog(createBlock);
    OnErrorLog(processBlock);
    OnErrorLog(doneBlock);