Search code examples
c#task-parallel-librarytpl-dataflow

Reopen TPL Dataflow input after marking it complete


I'm trying to make a processing pipeline service that users can place an item into and wait for the results to finish being processed. My idea is to use DI to have it inject able.

The problem I'm facing is that after processing the first set of data and marking the input block as complete, it remains closed when I try processing another set of data. Is there a way to reopen the pipeline to allow data processing again?

I'm also using a library ontop of TPL Dataflow called DataflowEx.

   public interface IPipelineService
   {
        Task FillPipeline(object inputObj);

        Task WaitForResults();

        Task<List<object>> GetResults();

        Task FlushPipeline();

        Task Complete();
   }

   public class Pipeline : Dataflow<object>, IPipelineService
   {
        private TransformBlock<object, object> _inputBlock;
        private ActionBlock<object> _resultBlock;

        private List<object> _results { get; set; }

        public Pipeline() : base(DataflowOptions.Default)
        {
            _results = new List<object>();

            _inputBlock = new TransformBlock<object, object>(obj => Processing.Processing.ReceiveOrder(obj));
            _resultBlock = new ActionBlock<object>(obj => _results.Add(Processing.Processing.ReturnProcessedOrder(obj)));

            _inputBlock.LinkTo(_resultBlock, new DataflowLinkOptions() { PropagateCompletion = true });

            RegisterChild(_inputBlock);
            RegisterChild(_resultBlock);
        }

        public Task FillPipeline(object inputObj)
        {
            //InputBlock.Post(inputObj);
            return Task.CompletedTask;
        }

        public async Task WaitForResults()
        {
            await this.CompletionTask;
        }

        public Task<List<object>> GetResults()
        {
            return Task.FromResult(_results);
        }

        public Task FlushPipeline()
        {
            _results = new List<object>();
            return Task.CompletedTask;
        }

        Task IPipelineService.Complete()
        {
            InputBlock.Complete();
            return Task.CompletedTask;
        }

        public override ITargetBlock<object> InputBlock { get { return _inputBlock; } }

        public object Result { get { return _results; } }
    }

This the basic example I'm working with at the moment to test this idea.

This is how I want to be able to use it and be able to have items be fed into it after it has finished processing the first set.

await _pipelineService.FillPipeline(new GenerateOrder(OrderType.HomeLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.OtherLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.PersonalLoan).order);
await _pipelineService.FillPipeline(new GenerateOrder(OrderType.CarLoan).order);
await _pipelineService.Complete();
await _pipelineService.WaitForResults();

Solution

  • You can't restart a completed dataflow set - I just reset my objects to start again (in this case I call ResetDataFlow in CompleteAsync())

    public class DownloadConnector
    {
        public DownloadDataFlow DataFlow { get; set; }
    
        public DownloadConnector(int maxDop)
        {
            DataFlow = new DownloadDataFlow(maxDop);
        }
    
        public async Task SendAsync(DownloadItem item)
        {
            await DataFlow.BufferBlock.SendAsync(item);
        }
    
        public async Task CompleteAsync()
        {
            DataFlow.BufferBlock.Complete();
            await DataFlow.ActionBlock.Completion;
            DataFlow.ResetDataFlow();
        }
    }
    
    public class DownloadDataFlow
    {
        public BufferBlock<DownloadItem> BufferBlock { get; set; }
        public TransformBlock<DownloadItem, DownloadItem> TransformBlock { get; set; }
        public ActionBlock<DownloadItem> ActionBlock { get; set; }
        public int MaxDop { get; set; }
    
        public DownloadDataFlow(int maxDop)
        {
            MaxDop = maxDop;
            ResetDataFlow();
        }
    
        public DownloadDataFlow ResetDataFlow()
        { 
            BufferBlock = new BufferBlock<DownloadItem>();
            TransformBlock = new TransformBlock<DownloadItem, DownloadItem>(DownloadAsync);
            ActionBlock = new ActionBlock<DownloadItem>(OnCompletion, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxDop });
            BufferBlock.LinkTo(TransformBlock, new DataflowLinkOptions { PropagateCompletion = true });
            TransformBlock.LinkTo(ActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
    
            return this;
        }
    
        public async Task DownloadAsync(DownloadItem item)
        {
            ...
        }
    
        public async Task OnCompletion(DownloadItem item)
        {
            ...
        }
    }
    
    public class DownloadItem
    {
        ...
    }
    

    And the code is run using:

    var connector = new DownloadConnector(10);
    await connector.SendAsync(new DownloadItem());
    await connector.SendAsync(new DownloadItem());
    await connector.SendAsync(new DownloadItem());
    await connector.SendAsync(new DownloadItem());
    await connector.CompleteAsync();
    
    await connector.SendAsync(new DownloadItem());
    await connector.SendAsync(new DownloadItem());
    await connector.SendAsync(new DownloadItem());
    await connector.SendAsync(new DownloadItem());
    await connector.CompleteAsync();