Search code examples
.nettask-parallel-librarytpl-dataflow

How do I open then close a connection based on Tasks in TPL Dataflow pipeline?


The setup

I have a pipeline processing many similar Tasks. It has three segments in order, A, then B, then C (each of which may have multiple stages). A and C are local to the machine, B is remote.

Three stage pipeline, middle stage B is remote

Tasks in section B, the remote section, need a connection to work. They should all share one connection.

The question

I want to create the connection when the first task moves from A to B, and close it when the last task moves from B to C. How do I do this?

The constraints

If it makes the solution easier I can accept one or more of the following constraints:

  • Segment B has only one stage.
  • Only one task in segment B at a time.
  • Queues (buffers) or any other block inserted between A and B or between B and C.

Solution

  • This is how I would do it:

    • Have a connection variable somewhere initially set to null.
    • Whenever an item is processed, connection is created if needed and then used.
    • After the block is completed (which you can find out using the Completed property, assuming you propagate completion properly), Dispose() the connection, if it has been created.

    In code, as a helper method, it could look something like this:

    public static TransformBlock<TInput, TOutput>
        CreateTransformBlockWithConnection<TInput, TOutput, TConnection>(
        Func<TInput, TConnection, TOutput> transform,
        Func<TConnection> connectionFactory)
        where TConnection : class, IDisposable
    {
        TConnection connection = null;
    
        var block = new TransformBlock<TInput, TOutput>(
            input =>
            {
                if (connection == null)
                    connection = connectionFactory();
    
                return transform(input, connection);
            });
    
        block.Completion.ContinueWith(
            _ =>
            {
                if (connection != null)
                    connection.Dispose();
            });
    
        return block;
    }
    

    (Note that this won't behave correctly if connectionFactory ever returns null. If you're worried about that, you can add a check for it.)

    Usage example (this executes each string input as a SQL command and returns the result):

    var block = CreateTransformBlockWithConnection(
        (string input, SqlConnection connection) =>
            new SqlCommand(input, connection).ExecuteScalar(),
        () => new SqlConnection());