Search code examples
c#task-parallel-librarypipelinetpl-dataflowbufferblock

TPL Dataflow Complete Pipeline when condition matches


I thought it's very basic approach but I haven't found any example yet. I have one producer and one consumer and I want to finish the pipeline when at least x objects were processed. Additionally I need to know what objects have been received.

That's how I do it:

public class BlockTester
{
    private static TransformBlock<int, int> _worker;

    public static async Task StartAsync()
    {
        _worker = new TransformBlock<int, int>(s => s + s);
        var buffer = new BufferBlock<int>();
        var consumeTask = Consume(buffer);

        _worker.LinkTo(buffer, new DataflowLinkOptions{PropagateCompletion = true});

        foreach (var value in Enumerable.Range(0,100))
        {
            _worker.Post(value);
        }

        _worker.Complete();

        await buffer.Completion;

        if(buffer.TryReceiveAll(out var received))
        {
            Console.WriteLine(string.Join(", ", received));
        }
    }

    public static async Task<IReadOnlyCollection<int>> Consume(ISourceBlock<int> buffer)
    {
        var received = new List<int>();

        while (await buffer.OutputAvailableAsync())
        {
            var current = buffer.Receive();

            received.Add(current);

            if (current > 25)
            {
                _worker.Complete();
            }
        }

        return received;
    }
}

I am a bit confused about the buffer.TryReceiveAll. What's the difference between awaiting the consume task and the TryReceiveAll? Why does TryReceiveAll is false in my scenario? I guess there's still something wrong with my approach to reach my goals.


Solution

  • Your Consume method should be an ActionBlock. There's no need to use OutputAvailableAsync or TryRecieveAll. Replace the BufferBlock with an ActionBlock and do your processing within the ActionBlock. It's not clear why you would need the TransformBlock either unless you have more than one step in the process.

    public class BlockTester
    {
        //Could be removed
        private static TransformBlock<int, int> _worker;
    
        public static async Task StartAsync()
        {
            //Could be removed
            _worker = new TransformBlock<int, int>(s => s + s);
            var processor = new ActionBlock<int>(x => ProcessMessage(x));
    
            _worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
    
            foreach (var value in Enumerable.Range(0, 100))
            {
                _worker.Post(value);
            }
    
            //_worker.Complete();
    
            await processor.Completion;
        }
    
    
        private static int itemsRecieved = 0;
        public static void ProcessMessage(int x)
        {
            Interlocked.Increment(ref itemsRecieved);
            if (itemsRecieved > 25) _worker.Complete();
            //process the message
            //log the message etc.
        }
    }
    

    Or with a complex message object:

    public class Message { }
    
    public class BlockTester
    {
        //Could be removed
        private static TransformBlock<Message, Message> _worker;
    
        public static async Task StartAsync()
        {
            //Could be removed
            _worker = new TransformBlock<Message, Message>(s => s);
            var processor = new ActionBlock<Message>(x => ProcessMessage(x));
    
            _worker.LinkTo(processor, new DataflowLinkOptions { PropagateCompletion = true });
    
            foreach (var value in Enumerable.Range(0, 100).Select(_ => new Message()))
            {
                _worker.Post(value);
            }
    
            //_worker.Complete();
    
            await processor.Completion;
        }
    
    
        private static ConcurrentBag<Message> itemsRecieved = new ConcurrentBag<Message>();
        public static void ProcessMessage(Message x)
        {
            itemsRecieved.Add(x);
            if (itemsRecieved.Count > 25) _worker.Complete();
            //process the message
            //log the message etc.
        }
    }
    

    Edit To answer the original question:

    Why does TryReceiveAll return false:

    Because by the time TryReceiveAll is ran the BufferBlock has "completed". For a block to be completed it must contain 0 items in its output buffer. The Consume method was pulling all the items out before the block was allowed to complete and you'd finally call TryRecieveAll on an empty block.