I am trying to create a pipeline using TPL Dataflow
where i can store messages in a batch block , and whenever its treshold is hit it would send the data to an action block.I have added a buffer block in case the action block is too slow.
So far i have tried all possible methods to move data from the first block to the second to no avail. I have linked the blocks , added the DataFlowLinkOptions
of PropagateCompletion
set to true
. What else do I have to do in order for this pipeline to work ?
Pipeline
class LogPipeline<T>
{
private ActionBlock<T[]> actionBlock;
private BufferBlock<T> bufferBlock;
private BatchBlock<T> batchBlock;
private readonly Action<T[]> action;
private readonly int BufferSize;
private readonly int BatchSize;
public LogPipeline(Action<T[]> action, int bufferSize = 4, int batchSize = 2)
{
this.BufferSize = bufferSize;
this.BatchSize = batchSize;
this.action = action;
}
private void Initialize()
{
this.bufferBlock = new BufferBlock<T>(new DataflowBlockOptions
{ TaskScheduler = TaskScheduler.Default,
BoundedCapacity = this.BufferSize });
this.actionBlock = new ActionBlock<T[]>(this.action);
this.batchBlock = new BatchBlock<T>(BatchSize);
this.bufferBlock.LinkTo(this.batchBlock, new DataflowLinkOptions
{ PropagateCompletion = true });
this.batchBlock.LinkTo(this.actionBlock, new DataflowLinkOptions
{ PropagateCompletion = true });
}
public void Post(T log)
{
this.bufferBlock.Post(log);
}
public void Start()
{
this.Initialize();
}
public void Stop()
{
actionBlock.Complete();
}
}
Test
[TestCase(100, 1000, 5)]
public void CanBatchPipelineResults(int batchSize, int bufferSize, int cycles)
{
List<int> data = new List<int>();
LogPipeline<int> logPipeline = new LogPipeline<int>(
batchSize: batchSize,
bufferSize: bufferSize,
action: (logs) =>
{
data.AddRange(logs);
});
logPipeline.Start();
int SelectWithEffect(int element)
{
logPipeline.Post(element);
return 3;
}
int count = 0;
while (true)
{
if (count++ > cycles)
{
break;
}
var sent = Parallel.For(0, bufferSize, (x) => SelectWithEffect(x));
}
logPipeline.Stop();
Assert.IsTrue(data.Count == cycles * batchSize);
}
Why are all my blocks empty besides the buffer? I have tried with SendAsync
also to no avail. No data is moved from the first block to the next no matter what I do.
I have both with and without the link options.
Update :
I have completely erased the pipeline and also the Parallel
.
I have tried with all kinds of input blocks (batch/buffer/transform) and it seems there is no way subsequent blocks are getting something.
I have also tried with await SendAsync
as well as Post
.
I have only tried within unit tests
classes.
Could this be the issue ?
Update 2 I was wrong complicating things , i have tried a more simple example . Inside a testcase even this doesnt work:
List<int> items=new List<int>();
var tf=new TransformBlock<int,int>(x=>x+1);
var action= new ActionBlock<int>(x=>items.Add(x));
tf.LinkTo(action, new DataFlowOptions{ PropagateCompletion=true});
tf.Post(3);
//Breakpoint here
The reason nothing seems to happen before the test ends is that none of the block has a chance to run. The code blocks all CPUs by using Parallel.For
so no other task has a chance to run. This means that all posted messages are still in the first block. The code then calls Complete
on the last block but doesn't even await for it to finish processing before checking the results.
The code can be simplified a lot. For starters, all blocks have input buffers, they don't need extra buffering.
The pipeline could be replaced with just this :
//Arrange
var list=new List<int>();
var head=new BatchBlock<int>(BatchSize);
var act=new ActionBlock<int[]>(nums=>list.AddRange(nums);
var options= new DataflowLinkOptions{ PropagateCompletion = true };
head.LinkTo(act);
//ACT
//Just fire everything at once, because why not
var tasks=Enumerable.Range(0,cycles)(
i=>Task.Run(()=> head.Post(i)));
await tasks;
//Tell the head block we're done
head.Complete();
//Wait for the last block to complete
await act.Completion;
//ASSERT
Assert.Equal(cycles, data.Count);
There's no real need to create a complex class to encapsulate the pipeline. It doesn't "start" - the blocks do nothing if they have no data. To abstract it, one only needs to provide access to the head block and the last block's Completion
task