How can I stop processing of DataFlow blocks if one of the blocks made decision that an error occurred, preventing next blocks to run. I thought a block can throw an exception, but not sure what is the proper way to stop further processing pipeline.
UPDATE:
private async void buttonDataFlow_Click(object sender, EventArgs e)
{
var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
if (cells == null)
return;
var blockPrepare = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(Prepare),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
var blockPreparationFeedback = new TransformBlock<Cell, Cell>(new Func<Cell, Task<Cell>>(PreparationFeedback),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
var blockTestMover = new ActionBlock<Cell>(new Func<Cell, Task>(TestMover),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
blockPrepare.LinkTo(blockPreparationFeedback, new DataflowLinkOptions { PropagateCompletion = true });
blockPreparationFeedback.LinkTo(blockTestMover, new DataflowLinkOptions { PropagateCompletion = true });
foreach (Cell c in cells)
{
var progressHandler = new Progress<string>(value =>
{
c.Status = value;
});
c.Progress = progressHandler as IProgress<string>;
blockPrepare.Post(c);
};
blockPrepare.Complete();
try
{
await blockTestMover.Completion;
}
catch(Exception ee)
{
Console.WriteLine(ee.Message);
}
Console.WriteLine("Done");
}
UPDATE 2:
public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
Func<TInput, Task> action,
Action<Exception> exceptionHandler,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return new ActionBlock<TInput>(async input =>
{
try
{
await action(input);
}
catch (Exception ex)
{
exceptionHandler(ex);
}
}, dataflowBlockOptions);
}
If what you want is that an exception in a block means the current items does go further in the pipeline, but processing of other items should continue without interruption, then you can do that by creating a block that produces one item if processing succeeds, but produces zero items when an exception is thrown:
public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
Action<Exception> exceptionHandler,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return new TransformManyBlock<TInput, TOutput>(async input =>
{
try
{
var result = await transform(input);
return new[] { result };
}
catch (Exception ex)
{
exceptionHandler(ex);
return Enumerable.Empty<TOutput>();
}
}, dataflowBlockOptions);
}