I am playing around with Data Flows and trying to learn how to use them. I have found plenty of examples showing how to use the different blocks but none of them really explain how to handle exceptions.
My main question is how to continue a foreach loop if an exception occurs or the output of a previous transform block is not what you are expecting. Below is a simple Windows forms application I am using to test with. Its just a single button that loops through a list of numbers and displays them.
I've added an if statement to the action block that says if the number=5, throw an exception. The loop looks like it keeps processing after it hit the exception but it stops writing the output after it hits the exception. The exception also never goes to the catch clause in the foreach loop.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
namespace DataFlowsTest
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void button1_Click(object sender, EventArgs e)
{
List<int> TestList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
var actionBlock = new ActionBlock<int>(item =>
{
if (item == 5)
throw new Exception("Blech.");
Debug.WriteLine(item.ToString());
});
foreach(var number in TestList)
{
try
{
actionBlock.Post(number);
}
catch(AggregateException ex)
{
Debug.WriteLine(ex.Message);
continue;
}
}
actionBlock.Complete();
}
}
}
This code returns 1 2 3 4 Exception thrown: 'System.Exception' in DataFlowsTest.exe An exception of type 'System.Exception' occurred in DataFlowsTest.exe but was not handled in user code Blech.
Here's how I implemented it. I can share a lot more on Github if this approach interests you. I use dataflow quite a lot so I have implemented a whole lot of other IDataFlow classes based on this approach.
Essentially, by wrapping every message in a class called a Flow<T>
, we can implement a Railway Oriented approach. A flow has two states: Failure or Success. Successful flows are Flow<T>
's are passed to the next custom dataflow or connected to a FailureBlock : ITargetBlock<IFlow>
on failure. (essentially an ActionBlock<IFlow>
which deals with exceptions, logs etc.
My basic Flow class looks as follows:
public class Flow<T> : IFlow
{
public T Value { get; private set; }
public Exception Exception { get; private set; }
public bool Success => Exception is null;
public bool Failure => !Success;
public void Fail(Exception exception) => Exception = exception;
public Flow(T value) => Data = value;
public Flow(Exception exception) => Fail(exception);
public static Flow<T> FromValue<T>(T data) => new Flow<T>(data);
}
public interface IFlow
{
bool Success { get; }
bool Failure { get; }
Exception Exception { get; }
void Fail(Exception exception);
}
The following part looks scary, but don't be. It's essentially a TransformBlock wrapper with two extra features:
enter code here1. each custom FlowBlock<T1,T2>
wraps methods into a try { } catch { }
LinkTo
method links successful flows to the next block and failures to the FailureBlock
public class FlowBlock<TInput, TOutput>: IPropagatorBlock<Flow<TInput>, Flow<TOutput>>
{
protected override ITargetBlock<Flow<TInput>> Target => TransformBlock;
protected override ISourceBlock<Flow<TOutput>> Source => TransformBlock;
private TransformBlock<Flow<TInput>, Flow<TOutput>> TransformBlock { get; }
private FailureBlock FailureBlock { get; }
public FlowBlock(
Func<TInput, Task<TOutput>> transform,
ExecutionDataflowBlockOptions dataflowBlockOptions,
FailureBlock failureBlock)
{
TransformBlock = new TransformBlock<Flow<TInput>, Flow<TOutput>>(
async inFlow =>
{
try
{
return new Flow<TOutput>(await transform(inFlow.Data));
}
catch (Exception exception)
{
return new Flow<TOutput>(exception);
}
},
dataflowBlockOptions);
}
public override IDisposable LinkTo(
ITargetBlock<Flow<TOutput>> target,
DataflowLinkOptions linkOptions)
=> new Disposable(
Source.LinkTo(target, linkOptions, flow => flow.Success),
Source.LinkTo(OutputBlock, linkOptions, flow => flow.Failure));
}
Let me know in the comments if you are interested and I'll gladly open a github repo with a whole lot more details.