Search code examples
c#iotask-parallel-librarytpl-dataflow

Collecting Data from multiple IO-based Task from a single Thread-based task


Using TPL, how do I collect results from multiple IO sources ("thread-less" tasks) and merge them into a sequence as they come in from their respective sources without spawning a thread based task per source to monitor them? Would it be safe to poll the sources from one thread?

while (true)
{
    try
    {
        IEnumerable<UdpClient> readyChannels = 
            from channel in channels
            where channel.Available > 0
            select channel;

        foreach( UdpClient channel in readyChannels)
        {
           var result = await channel.ReceiveAsync();
           //do something with result like post to dataflow block.
        }
    }
    catch (Exception e)
    {
        throw (e);
    }
    ...

How about something like that?


Solution

  • I see several options here:

    If you want fire up the calls to ReceiveAsync(), set them up to do something with the result (e.g. send to a dataflow block, like you said) and then forget about them, you could use ContinueWith():

    foreach (var channel in readyChannels)
    {
       channel.ReceiveAsync().ContinueWith(task => 
       {
           var result = task.Result;
           //do something with result like post to dataflow block.
       }
    }
    

    One disadvantage of this is that you would need to handle exceptions in each continuation.

    Probably a better approach is to use OrderByCompletion() from Stephen Cleary's AsyncEx. This way, you can start all reads at once and process them as they complete:

    var tasks = readyChannels.Select(c => c.ReceiveAsync()).OrderByCompletion();
    
    foreach (var task in tasks)
    {
       var result = await task;
       //do something with result like post to dataflow block.
    }
    

    Yet another option, useful for example if you want to limit parallelism, is to use TransformBlock:

    var receiveBlock = new TransformBlock<UdpClient, UdpReceiveResult>(
        c => c.ReceiveAsync(),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = degreeOfParallelism });
    foreach (var channel in readyChannels)
        receiveBlock.Post(channel);
    receiveBlock.Complete();
    
    // set up processing here
    
    await receiveBlock.Completion;
    

    If you want to send the results to another block, then the processing mentioned in a comment above consists of simply linking them together:

    receiveBlock.LinkTo(anotherBlock);
    

    In all of the cases above, there is never a thread blocking to monitor anything. But the code to call ReceiveAsync() and then to process the result has to execute somewhere.