I thought it's very basic approach but I haven't found any example yet. I have one producer and one consumer and I want to finish the pipeline when at least x objects were processed. Additionally I need to know what objects have been received.
That's how I do it:
public class BlockTester
{
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
_worker = new TransformBlock<int, int>(s => s + s);
var buffer = new BufferBlock<int>();
var consumeTask = Consume(buffer);
_worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});
foreach (var value in Enumerable.Range(0,100))
{
_worker.Post(value);
}
_worker.Complete();
await buffer.Completion;
if(buffer.TryReceiveAll(out var received))
{
Console.WriteLine(string.Join(", ", received));
}
}
public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
{
var received = new List<int>();
while (await buffer.OutputAvailableAsync())
{
var current = buffer.Receive();
received.Add(current);
if (current > 25)
{
_worker.Complete();
}
}
return received;
}
}
I am a bit confused about the buffer.TryReceiveAll. What's the difference between awaiting the consume task and the TryReceiveAll? Why does TryReceiveAll is false in my scenario? I guess there's still something wrong with my approach to reach my goals.
Your Consume
method should be an ActionBlock
. There's no need to use OutputAvailableAsync
or TryRecieveAll
. Replace the BufferBlock
with an ActionBlock
and do your processing within the ActionBlock
. It's not clear why you would need the TransformBlock
either unless you have more than one step in the process.
public class BlockTester
{
//Could be removed
private static TransformBlock<int, int> _worker;
public static async Task StartAsync()
{
//Could be removed
_worker = new TransformBlock<int, int>(s => s + s);
var processor = new ActionBlock<int>(x => ProcessMessage(x));
_worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var value in Enumerable.Range(0, 100))
{
_worker.Post(value);
}
//_worker.Complete();
await processor.Completion;
}
private static int itemsRecieved = 0;
public static void ProcessMessage(int x)
{
Interlocked.Increment(ref itemsRecieved);
if (itemsRecieved > 25) _worker.Complete();
//process the message
//log the message etc.
}
}
Or with a complex message object:
public class Message { }
public class BlockTester
{
//Could be removed
private static TransformBlock<Message, Message> _worker;
public static async Task StartAsync()
{
//Could be removed
_worker = new TransformBlock<Message, Message>(s => s);
var processor = new ActionBlock<Message>(x => ProcessMessage(x));
_worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
foreach (var value in Enumerable.Range(0, 100).Select(_ => new Message()))
{
_worker.Post(value);
}
//_worker.Complete();
await processor.Completion;
}
private static ConcurrentBag<Message> itemsRecieved = new ConcurrentBag<Message>();
public static void ProcessMessage(Message x)
{
itemsRecieved.Add(x);
if (itemsRecieved.Count > 25) _worker.Complete();
//process the message
//log the message etc.
}
}
Edit To answer the original question:
Why does
TryReceiveAll
return false:
Because by the time TryReceiveAll
is ran the BufferBlock
has "completed". For a block to be completed it must contain 0 items in its output buffer. The Consume
method was pulling all the items out before the block was allowed to complete and you'd finally call TryRecieveAll
on an empty block.