I am trying to use the open source lib DataflowEx with the next Dataflow declaration.
class RequestClientFlow :Dataflow<string>{
private readonly ILogger _logger;
private readonly Dataflow<string, WebProxy> _webproxyDataflow;
private readonly Dataflow<WebProxy, HttpClient> _httpClientDataflow;
public RequestClientFlow(ILogger logger) : this(DataflowOptions.Default){
_logger = logger;
}
public Dataflow<WebProxy, HttpClient> HttpClientDataflow => _httpClientDataflow;
public RequestClientFlow(DataflowOptions dataflowOptions) : base(dataflowOptions){
_webproxyDataflow = new TransformBlock<string,WebProxy>(s => {
_logger.WriteLine("aaaa");
return new WebProxy();
}).ToDataflow();
_httpClientDataflow = new TransformBlock<WebProxy,HttpClient>(proxy => {
_logger.WriteLine("bbbb");
return new HttpClient();
}).ToDataflow();
_webproxyDataflow.LinkTo(_httpClientDataflow);
RegisterChild(_webproxyDataflow);
RegisterChild(_httpClientDataflow);
}
public override ITargetBlock<string> InputBlock => _webproxyDataflow.InputBlock;
}
when I consume it like
var requestClientFlow = new RequestClientFlow(this);
requestClientFlow.Post("");
requestClientFlow.Complete();
await requestClientFlow.InputBlock.Completion;
it completes and my output displays
18:32:54.3773|aaaa 18:32:54.3773|bbbb
1 passed, 0 failed, 0 skipped, took 1.45 seconds (xUnit.net 2.3.1 build 3858).
However my understand is from the framework docs that i should also be able to use
requestClientFlow.Complete();
await requestClientFlow.CompletionTask;
or even
await requestClientFlow.SignalAndWaitForCompletionAsync();
it does not complete. Can someone please help me understand what I do wrong?
Your flow cannot complete since the last block is a TransformBlock
. In your first example you await
completion of the Input block which does in fact complete. The Output block cannot complete as the items in its output buffer have nowhere to go. The DataflowEx
library is correctly awaiting
on the final block in the flow. You can add an ActionBlock
or NullTarget
to the end to realize completion.
In terms of DataflowEx
the final flow should be implmeneting
public interface IDataflow<in TIn> : IDataflow
{
ITargetBlock<TIn> InputBlock { get; }
}
And as the example on the github page for the library shows:
public class AggregatorFlow : Dataflow<string>
{
//...//
public AggregatorFlow() : base(DataflowOptions.Default)
{
_splitter = new TransformBlock<string, KeyValuePair<string, int>>(s => this.Split(s));
_dict = new Dictionary<string, int>();
//***Note The ActionBlock here***
_aggregater = new ActionBlock<KeyValuePair<string, int>>(p => this.Aggregate(p));
//Block linking
_splitter.LinkTo(_aggregater, new DataflowLinkOptions() { PropagateCompletion = true });
/* IMPORTANT */
RegisterChild(_splitter);
RegisterChild(_aggregater);
}
//...//
}