Using TPL, how do I collect results from multiple IO sources ("thread-less" tasks) and merge them into a sequence as they come in from their respective sources without spawning a thread based task per source to monitor them? Would it be safe to poll the sources from one thread?
while (true)
{
try
{
IEnumerable<UdpClient> readyChannels =
from channel in channels
where channel.Available > 0
select channel;
foreach( UdpClient channel in readyChannels)
{
var result = await channel.ReceiveAsync();
//do something with result like post to dataflow block.
}
}
catch (Exception e)
{
throw (e);
}
...
How about something like that?
I see several options here:
If you want fire up the calls to ReceiveAsync()
, set them up to do something with the result (e.g. send to a dataflow block, like you said) and then forget about them, you could use ContinueWith()
:
foreach (var channel in readyChannels)
{
channel.ReceiveAsync().ContinueWith(task =>
{
var result = task.Result;
//do something with result like post to dataflow block.
}
}
One disadvantage of this is that you would need to handle exceptions in each continuation.
Probably a better approach is to use OrderByCompletion()
from Stephen Cleary's AsyncEx. This way, you can start all reads at once and process them as they complete:
var tasks = readyChannels.Select(c => c.ReceiveAsync()).OrderByCompletion();
foreach (var task in tasks)
{
var result = await task;
//do something with result like post to dataflow block.
}
Yet another option, useful for example if you want to limit parallelism, is to use TransformBlock
:
var receiveBlock = new TransformBlock<UdpClient, UdpReceiveResult>(
c => c.ReceiveAsync(),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = degreeOfParallelism });
foreach (var channel in readyChannels)
receiveBlock.Post(channel);
receiveBlock.Complete();
// set up processing here
await receiveBlock.Completion;
If you want to send the results to another block, then the processing mentioned in a comment above consists of simply linking them together:
receiveBlock.LinkTo(anotherBlock);
In all of the cases above, there is never a thread blocking to monitor anything. But the code to call ReceiveAsync()
and then to process the result has to execute somewhere.