Search code examples
c#async-awaittask-parallel-librarytpl-dataflow

How to stop processing pipeline on faulty block?


How can I stop processing of DataFlow blocks if one of the blocks made decision that an error occurred, preventing next blocks to run. I thought a block can throw an exception, but not sure what is the proper way to stop further processing pipeline.

UPDATE:

private async void buttonDataFlow_Click(object sender, EventArgs e)
{
    var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
    if (cells == null)
        return;

    var blockPrepare = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(Prepare),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    var blockPreparationFeedback = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(PreparationFeedback),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    var blockTestMover = new ActionBlock<Cell>(new Func<Cell, Task>(TestMover),
    new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

    blockPrepare.LinkTo(blockPreparationFeedback, new DataflowLinkOptions { PropagateCompletion = true });
    blockPreparationFeedback.LinkTo(blockTestMover, new DataflowLinkOptions { PropagateCompletion = true });

    foreach (Cell c in cells)
    {
        var progressHandler = new Progress<string>(value =>
        {
            c.Status = value;
        });

        c.Progress = progressHandler as IProgress<string>;
        blockPrepare.Post(c);
    };

    blockPrepare.Complete();
    try
    {
        await blockTestMover.Completion;
    }
    catch(Exception ee)
    {
        Console.WriteLine(ee.Message);
    }

    Console.WriteLine("Done");
}

UPDATE 2:

    public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
                    Func<TInput, Task> action,
                    Action<Exception> exceptionHandler,
                    ExecutionDataflowBlockOptions dataflowBlockOptions)
    {
        return new ActionBlock<TInput>(async input =>
        {
            try
            {
                await action(input);
            }
            catch (Exception ex)
            {
                exceptionHandler(ex);
            }
        }, dataflowBlockOptions);
    }

Solution

  • If what you want is that an exception in a block means the current items does go further in the pipeline, but processing of other items should continue without interruption, then you can do that by creating a block that produces one item if processing succeeds, but produces zero items when an exception is thrown:

    public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
        Func<TInput, Task<TOutput>> transform,
        Action<Exception> exceptionHandler,
        ExecutionDataflowBlockOptions dataflowBlockOptions)
    {
        return new TransformManyBlock<TInput, TOutput>(async input =>
        {
            try
            {
                var result = await transform(input);
                return new[] { result };
            }
            catch (Exception ex)
            {
                exceptionHandler(ex);
    
                return Enumerable.Empty<TOutput>();
            }
        }, dataflowBlockOptions);
    }