Search code examples
c#tpl-dataflow

DataflowEx never completes


I am trying to use the open source lib DataflowEx with the next Dataflow declaration.

class RequestClientFlow :Dataflow<string>{
    private readonly ILogger _logger;
    private readonly Dataflow<string, WebProxy> _webproxyDataflow;
    private readonly Dataflow<WebProxy, HttpClient> _httpClientDataflow;

    public RequestClientFlow(ILogger logger) : this(DataflowOptions.Default){
        _logger = logger;
    }

    public Dataflow<WebProxy, HttpClient> HttpClientDataflow => _httpClientDataflow;

    public RequestClientFlow(DataflowOptions dataflowOptions) : base(dataflowOptions){
        _webproxyDataflow = new TransformBlock<string,WebProxy>(s => {
            _logger.WriteLine("aaaa");
            return new WebProxy();
        }).ToDataflow();
        _httpClientDataflow = new TransformBlock<WebProxy,HttpClient>(proxy => {
            _logger.WriteLine("bbbb");
            return new HttpClient();
        }).ToDataflow();
        _webproxyDataflow.LinkTo(_httpClientDataflow);
        RegisterChild(_webproxyDataflow);
        RegisterChild(_httpClientDataflow);
    }

    public override ITargetBlock<string> InputBlock => _webproxyDataflow.InputBlock;
}

when I consume it like

var requestClientFlow = new RequestClientFlow(this);
requestClientFlow.Post("");
requestClientFlow.Complete();
await requestClientFlow.InputBlock.Completion;

it completes and my output displays

18:32:54.3773|aaaa 18:32:54.3773|bbbb

1 passed, 0 failed, 0 skipped, took 1.45 seconds (xUnit.net 2.3.1 build 3858).

However my understand is from the framework docs that i should also be able to use

    requestClientFlow.Complete();
    await requestClientFlow.CompletionTask;

or even

await requestClientFlow.SignalAndWaitForCompletionAsync();

it does not complete. Can someone please help me understand what I do wrong?


Solution

  • Your flow cannot complete since the last block is a TransformBlock. In your first example you await completion of the Input block which does in fact complete. The Output block cannot complete as the items in its output buffer have nowhere to go. The DataflowEx library is correctly awaiting on the final block in the flow. You can add an ActionBlock or NullTarget to the end to realize completion.

    In terms of DataflowEx the final flow should be implmeneting

    public interface IDataflow<in TIn> : IDataflow
    {
        ITargetBlock<TIn> InputBlock { get; }
    }
    

    And as the example on the github page for the library shows:

    public class AggregatorFlow : Dataflow<string>
    {
        //...//
    
        public AggregatorFlow() : base(DataflowOptions.Default)
        {
            _splitter = new TransformBlock<string, KeyValuePair<string, int>>(s => this.Split(s));
            _dict = new Dictionary<string, int>();
    
            //***Note The ActionBlock here***
            _aggregater = new ActionBlock<KeyValuePair<string, int>>(p => this.Aggregate(p));
    
            //Block linking
            _splitter.LinkTo(_aggregater, new DataflowLinkOptions() { PropagateCompletion = true });
    
            /* IMPORTANT */
            RegisterChild(_splitter);
            RegisterChild(_aggregater);
        }
    
        //...//
    }