I have a case when I need to receive data from more than one IAsyncEnumerable source. For performance benefit it should be performed in parallel manner.
I have written such code to achieve this goal using AsyncAwaitBestPractices, System.Threading.Tasks.Dataflow and System.Linq.Async nuget packages:
public static async IAsyncEnumerable<T> ExecuteSimultaneouslyAsync<T>(
this IEnumerable<IAsyncEnumerable<T>> sources,
int outputQueueCapacity = 1,
TaskScheduler scheduler = null)
{
var sourcesCount = sources.Count();
var channel = outputQueueCapacity > 0
? Channel.CreateBounded<T>(sourcesCount)
: Channel.CreateUnbounded<T>();
sources.AsyncParallelForEach(
async body =>
{
await foreach (var item in body)
{
await channel.Writer.WaitToWriteAsync();
await channel.Writer.WriteAsync(item);
}
},
maxDegreeOfParallelism: sourcesCount,
scheduler: scheduler)
.ContinueWith(_ => channel.Writer.Complete())
.SafeFireAndForget();
while (await channel.Reader.WaitToReadAsync())
yield return await channel.Reader.ReadAsync();
}
public static async Task AsyncParallelForEach<T>(
this IEnumerable<T> source,
Func<T, Task> body,
int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
TaskScheduler scheduler = null)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
};
if (scheduler != null)
options.TaskScheduler = scheduler;
var block = new ActionBlock<T>(body, options);
foreach (var item in source)
block.Post(item);
block.Complete();
await block.Completion;
}
This code works fine until two or more sources throws exception. It leeds to situation when second exception can not be handled and crushes application in some cases.
So I wonder if there is better way to consume data from several IAsyncEnumerable sources in parallel manner?
Keeping a pipeline running in case of exceptions is extremely difficult whether it's a functional or CSP pipeline. In most cases a pipeline will need to keep working even in case of individual message failures. A failing message doesn't mean the entire pipeline has failed.
That's why Railway-oriented programming is used to wrap messages and errors into Result<TOk,TError>
wrappers and "redirect" or ignore error messages. Such a class makes programming Dataflow, Channels and IAsyncEnumerable pipelines a lot easier.
In F#, using discriminated unions, one could define a Result type just with
type Result<'T,'TError> =
| Ok of ResultValue:'T
| Error of ErrorValue:'TError
DUs aren't in C# yet, so various alternatives have been proposed, some using inheritance from an IResult<>
base, some using classes/Records which allow exhaustive pattern matching, something not available with the IResult<>
techniques.
Let's assume the Result<>
here is :
public record Result<T>(T? result, Exception? error)
{
public bool IsOk => error == null;
public static Result<T> Ok(T result) => new(result, default);
public static Result<T> Fail(Exception exception) =>
new(default, exception);
public static implicit operator Result<T> (T value)
=> Result<T>.Ok(value);
public static implicit operator Result<T>(Exception err)
=> Result<T>.Fail(err);
}
The first step is to create a CopyAsync
helper that will copy all data from the input IAsyncEnumerable<Result<T>>
to an output ChannelWriter<Result<T>>
public static async Task CopyToAsync<T>(
this IAsyncEnumerable<Result<T>> input,
ChannelWriter<Result<T>> output,
CancellationToken token=default)
{
try
{
await foreach(var msg in input.WithCancellationToken(token).ConfigureAwait(false))
{
await output.WriteAsync(msg).ConfigureAwait(false);
}
}
catch(Exception exc)
{
await output.WriteAsync(Result.Fail(exc)).ConfigureAwait(false);
}
}
This way, even if an exception is thrown, a Failure message will be emitted instead of aborting the pipeline.
With that, you can merge multiple sources by copying input messages to an output channel :
public static ChannelReader<Result<T>> Merge(
this IEnumerable<IAsyncEnumerable<Result<T>> inputs,
CancellationToken token=default)
{
var channel=Channel.CreateBounded<Result<T>>(1);
var tasks = inputs.Select(inp=>CopyToAsync(channel.Writer,token));
_ = Task.WhenAll(tasks)
.ContinueWith(t=>channel.Writer.TryComplete(t.Exception));
return channel.Reader;
}
Using BoundedCapacity=1 maintains the backpressure behavior of downstread channels or consumers.
You can read all messages in a ChannelReader through Channel.ReadAllAsync(CancellationToken) :
IEnumerable<IAsyncEnumerable<Result<T>>> sources = ...;
var merged=sources.Merge();
await foreach(var msg in merged.ReadAllAsync())
{
//Pattern magic to get Good results only
if(msg is ({} value,null)
{
//Work with value
}
}
You can avoid exposing the channel by returning IAsyncEnumerable<>
:
public static IAsyncEnumerable<Result<T>> MergeAsync(
this IEnumerable<IAsyncEnumerable<Result<T>> inputs,
CancellationToken token=default)
{
return inputs.Merge(token).ReadAllAsync(token);
}
You can use System.Linq.Async to work on an IAsyncEnumerable<> using LINQ methods, eg to convert an IAsyncEnumerable<T>
to an IAsyncEnumerable<Result<T>>
:
source.Select(msg=>Result.Ok(msg))
Or filter failed messages before processing them :
source.Where(msg=>msg.IsOk)
You could create a method that applies a Func<T1,Task<T2>>
to an input and propagates results or errors as results :
public async Task<Result<T2>> ApplyAsync<T1,T2>(this Result<T1> msg,
Func<T1,Task<T2>> func)
{
if (msg is (_, { } err))
{
return err;
}
try
{
var result = await func(msg.result).ConfigureAwait(false);
return result;
}
catch(Exception exc)
{
return exc;
}
}
This is a ... bit ... easier in F#