Search code examples
.nettask-parallel-librarytpl-dataflow

How do I use a TPL TransformBlock when waiting for multiple inputs?


If I have multiple sources of data (say from a database) and then perform some CPU bound work how do I represent this using TPL DataFlow?

I've noticed the TransformBlock takes one input source, but the input comes from multiple sources and I want to make most of parallelism to achieve this.

Is the best way to use regular TPL or Parallel extensions to perform the IO bound work to the database and then merge this data into one point for the TransformBlock?


Solution

  • Have a look at JoinBlock, it might be what you need.

    You can also create a Custom Dataflow Block Type, that can do what you want to achieve.

    For example, if you are expecting 5 objects to arrive, before you "process" them, and return a single object (I use expando here to illusrate...) , to a single receiver (that should also asynchronously await):

    public sealed class YourCustomBlock : IPropagatorBlock<ExpandoObject, ExpandoObject>
    {
    
        // The target part of the block. coming in
        private readonly ITargetBlock<ExpandoObject> m_target;
        // The source part of the block. going out
        private readonly ISourceBlock<ExpandoObject> m_source;
        // dependency count
        private readonly int _size ;
    
        // temporary holding area of incoming stuff
        private Queue<ExpandoObject> _queue;
    
        public YourCustomBlock(int inputs)
        {
          _size = inputs;
          _queue = new Queue<ExpandoObject>(_size);
    
          var mainWorker= new TransformBlock<ExpandoObject[], ExpandoObject>     (async expandoArray =>
            {
                // Do Your Stuff with expandoArray and then return something
                // ExpandoObject in this example
    
                await Task.Delay(1000).ConfigureAwait(false); 
    
                return /*Some ExpandoObject */;
            });
    
            var head = new ActionBlock<ExpandoObject>(async item =>
            {
    
                _queue.Enqueue(item);
    
                if (_queue.Count > _size)
                {
                    _queue.Dequeue();  
                }
                // Post when you reach
                // the size
                if (_queue.Count == _size)
                {
                    await mainWorker.SendAsync(_queue.ToArray());
                    _queue.Clear();
                }
            });
    
            // expose as a block
            m_source = mainWorker;
            m_target = head;
    
        }
    }
    

    Sample use:

    var myBlock = new YourCustomBlock(5);
    Task.Run(async () => {
     for (var i=0;i<5;i++) {
        await myBlock.SendAsync(/*SomeExpandoObject*/).ConfigureAwait(false);
     }
    });
    
    var results = await myBlock.ReceiveAsync().ConfigureAwait(false);
    

    Note: This has not been compile checked, and is just an illustration of the idea.