Search code examples
c#task-parallel-library.net-4.5tpl-dataflow

TPL Dataflow pipe constantly running inside service


For example, I have three blocks:

       Buffer -> Transform -> Action 

I'm running webapi service which brings data from requests to the buffer block. How to create such a pipe which will be running forever without invoking Completion() at Action block and stoping whole pipe.


Solution

  • If you need the pipeline to remain for the lifetime of the application and not just the request you could use a static class to hold it. There's not necessarily any need to call complete on the action block. Another option depending on your needs would be to separate the application and the processing pipeline. These could be separated by a database message queue or just separate server side applications.

    @svick has a good point using a TaskCompletionSource to determine when the pipeline has finished with a particular item. Putting it all together here's a quick sample that might be helpful:

    public class Controller {
    
        public async Task<int> PostToPipeline(int inputValue) {
            var message = new MessageIn(inputValue);
            MyPipeline.InputBuffer.Post(message);
            return await message.Completion.Task;
        }
    }
    
    public class MessageIn {
        public MessageIn(int value) {
            InputValue = value;
            Completion = new TaskCompletionSource<int>();
        }
    
        public int InputValue { get; set; }
        public TaskCompletionSource<int> Completion { get; set; }
    }
    
    public class MessageProcessed {
        public int ProcessedValue { get; set; }
        public TaskCompletionSource<int> Completion { get; set; }
    }
    
    public static class MyPipeline {
    
        public static BufferBlock<MessageIn> InputBuffer { get; private set; }
        private static TransformBlock<MessageIn, MessageProcessed> transform;
        private static ActionBlock<MessageProcessed> action;
    
        static MyPipeline() {
            BuildPipeline();
            LinkPipeline();
    
        }
    
        static void BuildPipeline() {
            InputBuffer = new BufferBlock<MessageIn>();
    
            transform = new TransformBlock<MessageIn, MessageProcessed>((Func<MessageIn, MessageProcessed>)TransformMessage, new ExecutionDataflowBlockOptions() {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                BoundedCapacity = 10
            });
    
            action = new ActionBlock<MessageProcessed>((Action<MessageProcessed>)CompletedProcessing, new ExecutionDataflowBlockOptions() {
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                BoundedCapacity = 10
            });
        }
    
        static void LinkPipeline() {
            InputBuffer.LinkTo(transform, new DataflowLinkOptions() { PropagateCompletion = true });
            transform.LinkTo(action, new DataflowLinkOptions() { PropagateCompletion = true });
        }
    
        static MessageProcessed TransformMessage(MessageIn message) {
            return new MessageProcessed() {
                ProcessedValue = message.InputValue++,
                Completion = message.Completion
            };
        }
    
        static void CompletedProcessing(MessageProcessed message) {
            message.Completion.SetResult(message.ProcessedValue);
        }
    } 
    

    There a few ways to coordinate the completion of a specific job within the pipeline; awaiting the completion source may be the best approach for your needs.