Search code examples
c#task-parallel-librarytpl-dataflow

TPL DataFlow unable to handle an exception in ActionBlock


I'm trying to send a copy of one message from an ActionBlock<int> to multiple consumers which are also ActionBlock<int>. This works well, however if one of the target blocks throws an exception, it seems that this is not propagated to the source block. Here how I try to handle the exception but it never goes to the catch part:

static void Main(string[] args)
{
    var t1 = new ActionBlock<int>(async i =>
    {
        await Task.Delay(2000);
        Trace.TraceInformation($"target 1 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });

    var t2 = new ActionBlock<int>(async i =>
    {
        await Task.Delay(1000);
        Trace.TraceInformation($"target 2 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });

    var t3 = new ActionBlock<int>(async i =>
    {
        await Task.Delay(100);
        Trace.TraceInformation($"target 3 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
        if (i > 5)
            throw new Exception("Too big number");
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });

    var targets = new [] { t1, t2, t3};

    var broadcaster = new ActionBlock<int>(
        async item =>
        {
            var processingTasks = targets.Select(async t =>
            {
                try
                {
                    await t.SendAsync(item);
                }
                catch
                {
                    Trace.TraceInformation("handled in select"); // never goes here
                }
            });

            try
            {
                await Task.WhenAll(processingTasks);
            }
            catch
            {
                Trace.TraceInformation("handled"); // never goes here
            }
        });

    for (var i = 1; i <= 10; i++)
        broadcaster.Post(i);
}

I'm not sure what I'm missing here but I would like to be able to retrive the exception and which target block has faulted.


Solution

  • If a block enters a faulted state it will no longer accept new items and the Exception it threw will be attached to its Completion task and/or propagated with its completion if linked in a pipeline. To observe the Exception you can await the completion if the block refuses more items.

    var processingTasks = targets.Select(async t =>
    {
        try
        {
            if(!await t.SendAsync(item))
                await t.Completion;
        }
        catch
        {
            Trace.TraceInformation("handled in select"); // never goes here
        }
    });