I have walked through the TPL DataFlow and have faced a very irritating problem occurred in the code using TrasformBlock
linked to ActionBlock
.
Eventually I have found that items got stuck in TransformBlock
's output queue, since its OutputCount
property continuously returned the value higher than "0".
That's why the whole application deadlocked. However, it unblocks as soon as I call TransformBlock.TryReceiveAll()
.
Can anyone, please, let me know if there is something I missed or how to prevent such behavior?
static void Main()
{
int total = 0;
int itemsProcessing = 0;
TransformBlock<int, Tuple<int, double>> transformBlock = new TransformBlock<int, Tuple<int, double>>(
i => new Tuple<int, double>(i, Math.Sqrt(i)),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 20,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
ActionBlock<Tuple<int, double>> outputBlock = new ActionBlock<Tuple<int, double>>(async tuple =>
{
await Task.Delay(1000); // simulating data output delay
Interlocked.Decrement(ref itemsProcessing);
},
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 5,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
transformBlock.Completion.ContinueWith(t => outputBlock.Complete());
using (Timer timer = new Timer(o =>
{
Console.Title = string.Format(
"{0}: {1}/{2} {3}/{4}/{5}",
Assembly.GetExecutingAssembly().GetName().Name,
Volatile.Read(ref itemsProcessing), Volatile.Read(ref total),
transformBlock.InputCount, transformBlock.OutputCount, outputBlock.InputCount);
}, null, 100, 100))
{
using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
for (int i = 0; i < 40; i++)
{
Thread.Sleep(100); // simulating new item retrieval delay
Interlocked.Increment(ref total);
Interlocked.Increment(ref itemsProcessing);
transformBlock.SendAsync(i).Wait();
}
}
Console.WriteLine("Enqueued");
transformBlock.Complete();
outputBlock.Completion.Wait();
Console.WriteLine("Finish");
timer.Change(Timeout.Infinite, Timeout.Infinite);
timer.Dispose();
}
}
Invoking TransformBlock.LinkTo
gets you back a disposable registration. When you dispose of that registration the blocks unlink.
Your using
scope ends too soon and the blocks unlink before the TransformBlock
has a chance to empty itself into the ActionBlock
preventing it from being able to complete. Since the first block doesn't complete the next one doesn't even start completing, let alone finish.
Moving that wait inside the using
block solves the deadlock:
using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
for (int i = 0; i < 40; i++)
{
Thread.Sleep(100); // simulating new item retrieval delay
Interlocked.Increment(ref total);
Interlocked.Increment(ref itemsProcessing);
transformBlock.SendAsync(i).Wait();
}
Console.WriteLine("Enqueued");
transformBlock.Complete();
outputBlock.Completion.Wait();
Console.WriteLine("Finish");
}
As a side note, you really shouldn't be blocking on async code in such a way. It would be much simpler to use async-await instead of Wait()
, Task.Delay
instead of Thread.Sleep
, etc.
Also, since you're using PropagateCompletion
you don't need to call outputBlock.Complete()
explicitly.