I have a pipeline that is not getting registering as complete even though all the data has been processed and displayed on the console. I have it set to wait on the completion, but it never finishes and isn't allowing the method to return.
TransformBlock<string, CompanyInfo> GetCompanyInfo;
TransformBlock<string, List<Dividend>> GetDividendReports;
TransformBlock<string, KeyStats> GetKeyStatInfo;
TransformBlock<string, List<Interval>> GetIntervalReports;
TransformBlock<List<Interval>, List<decimal>> GetChangesOverInterval;
BroadcastBlock<string> broadcastSymbol;
TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string> GenerateXmlString;
ActionBlock<string> GenerateCompleteReport;
CancellationTokenSource cancellationTokenSource;
public Task StartPipeline()
{
cancellationTokenSource = new CancellationTokenSource();
ExecutionDataflowBlockOptions executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = cancellationTokenSource.Token,
MaxDegreeOfParallelism = MAXPARA
};
broadcastSymbol = new BroadcastBlock<string>(symbol => symbol);
var joinblock = new JoinBlock<List<decimal>, List<Dividend>, KeyStats>(new GroupingDataflowBlockOptions { Greedy = false });
GetCompanyInfo = new TransformBlock<string, CompanyInfo>(symbol =>
{
return RetrieveCompanyInfo(symbol);
}, executionDataflowBlockOptions);
GetDividendReports = new TransformBlock<string, List<Dividend>>(symbol =>
{
return RetrieveDividendInfo(symbol);
}, executionDataflowBlockOptions);
GetKeyStatInfo = new TransformBlock<string, KeyStats>(symbol =>
{
return RetrieveKeyStats(symbol);
}, executionDataflowBlockOptions);
GetIntervalReports = new TransformBlock<string, List<Interval>>(symbol =>
{
return RetrieveIntervals(symbol, 30);
}, executionDataflowBlockOptions);
GetChangesOverInterval = new TransformBlock<List<Interval>, List<decimal>>(intervals =>
{
return ConstructIntervalReport(intervals);
}, executionDataflowBlockOptions);
GenerateXmlString = new TransformBlock<Tuple<List<decimal>, List<Dividend>, KeyStats>, string>(tup =>
{
var ReportObj = new Report
{
changeIntervals = tup.Item1,
dividends = tup.Item2,
keyStats = tup.Item3
};
XmlSerializer ser = new XmlSerializer(typeof(Report));
var stringWriter = new StringWriter();
ser.Serialize(stringWriter, ReportObj);
return stringWriter.ToString();
}, executionDataflowBlockOptions);
GenerateCompleteReport = new ActionBlock<string>(xml =>
{
var str = Path.GetRandomFileName().Replace(".", "") + ".xml";
File.WriteAllText(str, xml);
Console.WriteLine("Finished File");
}, executionDataflowBlockOptions);
var options = new DataflowLinkOptions { PropagateCompletion = true };
var buffer = new BufferBlock<string>();
buffer.LinkTo(broadcastSymbol);
//Broadcasts the symbol
broadcastSymbol.LinkTo(GetIntervalReports, options);
broadcastSymbol.LinkTo(GetDividendReports, options);
broadcastSymbol.LinkTo(GetKeyStatInfo, options);
//Second teir parallel
GetIntervalReports.LinkTo(GetChangesOverInterval, options);
//Joins the parallel blocks back together
GetDividendReports.LinkTo(joinblock.Target2, options);
GetKeyStatInfo.LinkTo(joinblock.Target3, options);
GetChangesOverInterval.LinkTo(joinblock.Target1, options);
joinblock.LinkTo(GenerateXmlString, options);
GenerateXmlString.LinkTo(GenerateCompleteReport, options);
buffer.Post("F");
buffer.Post("AGFS");
buffer.Post("BAC");
buffer.Post("FCF");
buffer.Complete();
GenerateCompleteReport.Completion.Wait(cancellationTokenSource.Token);
}
I'm not sure why it isn't returning from the pipeline with an exception or completion. When the program runs it shows all the files being created and stops, but no code executes after the wait completion. Shouldn't the PropagateCompletion allow for the blocks to know when they have completed their actions or transforms?
Your not passing the link options to your BufferBlock
so completion is not being propagated. On another note only one linked block will receive completion from your BroadcastBlock
. If you want wait for all three linked blocks you'll have to handle that explicitly on your own. See here for an example
Additionally since the method already returns a Task
its unnecessary to return a Task.CompletedTask
, you could simply use async/sawait
instead of blocking with .Wait()
. And what would you expect a caller that await
s this method to do with null
?
if (GenerateCompleteReport.Completion.IsCompletedSuccessfully) { return Task.CompletedTask; }
return null;
Instead you could:
await enerateCompleteReport.Completion