Search code examples
c#exceptionpipelinetpl-dataflow

How to shutdown TPL Dataflow on fatal exception, in a gracefully way?


I am using a sequential pipeline build on TPL Dataflow, that consists in 3 blocks:

  • B1 - prepares the message
  • B2 - posts the message to a remote service
  • B3 - saves the result

The problem is how to shutdown the pipeline, when an error like service down occurs. The pipeline must go down in a controlled manner, so no results from B2 are lost.


Solution

  • The solution was simple but took me few rounds before i got it, since there is not much information behind basic library information at Microsoft site.

    Hope it helps someone. The solution can be easily reconfigured to meet other requirements.

    The approach presented relies on:

    • CancellationTokenSource to signal shutdown. Each block, in the case of a fatal exception, should signal a shutdown through a shared CancellationTokenSource object.
    • Blocks that should stop to work immediately after signal should be initialized passing the shared CancellationTokenSource object
    • The program must wait for the final block to end all message processing.

    Here the solution in the Pipeline Class and the test proving it works.

    Here a working example:

    using Microsoft.VisualStudio.TestTools.UnitTesting;
    using System;
    using System.Threading;
    using System.Threading.Tasks.Dataflow;
    using System.Threading.Tasks;
    using System.Diagnostics;
    
    namespace Tests.Sets.Research
    {
        [TestClass]
        public class TPLTest
        {
            public class PipeLine
            {
                CancellationTokenSource cancellationTokenSource;
                TransformBlock<int, int> b1, b2;
                ActionBlock<int> bFinal;
    
                static int SimulateWork(String blockName, int message, CancellationTokenSource cancellationTokenSource)
                {
                    try
                    {
                        Thread.Sleep(100);
                        Trace.WriteLine($"{blockName} processed: {message}");
                    }
                    catch (Exception ex)
                    {
                        Trace.WriteLine($"Fatal error {ex.Message} at {blockName}");
                        cancellationTokenSource.Cancel();
                    }
                    return message;
                }
    
    
                public PipeLine(CancellationTokenSource cancellationTokenSource)
                {
                    this.cancellationTokenSource = cancellationTokenSource;
    
                    // Create three TransformBlock<int, int> objects. 
                    // Each blocks <int, int> object calls the SimulateWork method.
                    Func<string, int, CancellationTokenSource, int> doWork = (name, message, ct) => SimulateWork(name, message, ct);
    
                    b1 = new TransformBlock<int, int>((m1) => doWork("b1", m1, cancellationTokenSource),
                       new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 , CancellationToken = cancellationTokenSource.Token}); //discard messages on  this block if cancel is signaled
                    b2 = new TransformBlock<int, int>((m1) => doWork("b2", m1, cancellationTokenSource), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
                    bFinal = new ActionBlock<int>((m1) => doWork("bFinal", m1, cancellationTokenSource), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
    
                    b1.LinkTo(b2, new DataflowLinkOptions { PropagateCompletion = true });
                    b2.LinkTo(bFinal, new DataflowLinkOptions { PropagateCompletion = true });
                }
    
                internal void Complete()
                {
                    b1.Complete();
                }
    
                public void waifForCompletetion()
                {               
                    Trace.WriteLine($"Waiting for pipeline to end gracefully");
                    bFinal.Completion.Wait();
                    Trace.WriteLine($"Pipeline terminated");               
                }
    
                public void submitToPipe(int message)
                {
                    if (cancellationTokenSource.IsCancellationRequested)
                    {
                        Trace.WriteLine($"Message {message} was rejected. Pipe is shutting down.Throttling meanwhile");
                        return;
                    }
                    b1.SendAsync(message);
                }
            }
    
            [TestMethod]
            public void TestShutdown()
            {
                var cancellationTokenSource = new CancellationTokenSource();
                var pipeLine = new PipeLine(cancellationTokenSource);
    
                //post failure in 2 seconds. 
                //It would be the same if was signal from inside block 2
                Task.Factory.StartNew(async () =>
                {
                    await Task.Delay(2000);
                    Console.WriteLine("Time to shutdown the pipeline!");
                    cancellationTokenSource.Cancel();
                });
    
                //send requests to pipe in background for 5 seconds
                Task.Run(async () =>
                {
                    for (int i = 1; i < 100; i++)
                    {
                        if (cancellationTokenSource.IsCancellationRequested)
                            break;
    
                        Thread.Sleep(50); //to see pipe closing input
                        pipeLine.submitToPipe(i);
                    }
                    pipeLine.Complete();
                });
    
                pipeLine.waifForCompletetion();
            }
        }
    }
    

    Here the result:

    b2 processed: 13
    b1 processed: 22
    Message 45 was rejected. Pipe is shutting down.Throttling meanwhile 
    b2 processed: 14
    bFinal processed: 8
    b2 processed: 15
    bFinal processed: 9
    bFinal processed: 10
    bFinal processed: 11
    bFinal processed: 12
    bFinal processed: 13
    bFinal processed: 14
    bFinal processed: 15
    Pipeline terminated
    

    From the time Message 45 was rejected, no more messages were processed on B1.

    All messages already in B2 queue reached the end of the pipeline.