Here is a sample program that makes two subscriptions to console input (source observable is not relevant here). In the first subscription it uses Observable.SelectMany and in the second one a similar SelectMany operator that internally uses System.Threading.Tasks.Dataflow package. An exception is thrown for certain inputs in each of them. The exception is correctly forwarded to Observer onError which rethrows it in the default Subscribe implementation. The observed behaviour is that in case of an exception in SelectMany the process keeps running and in case of an exception in SelectManyPreseveOrder the process is terminated with an unhandled exception. What is the cause of the different behaviour? Is there a way to achieve the 'more friendly' behaviour in the SelectManyPreserveOrder operator? It's a .net 4.6.1 console application using Rx.Linq 2.2.5 and System.Threading.Tasks.Dataflow 4.10.0:
class Program
{
static async Task Main()
{
AppDomain.CurrentDomain.UnhandledException += (sender, args) => Console.WriteLine("App domain unhandled exception");
TaskScheduler.UnobservedTaskException += (sender, args) => Console.WriteLine("Unobserved task exception");
var consoleInput = Helper.ConsoleInput();
consoleInput.SelectMany(async input =>
{
await Task.Delay(50).ConfigureAwait(false);
if (input == "1")
throw new Exception("This exception is swallowed");
return input;
})
.Subscribe(s => Console.WriteLine($"SelectMany: {s}"));
consoleInput.SelectManyPreserveOrder(async input =>
{
await Task.Delay(50).ConfigureAwait(false);
if (input == "2")
throw new Exception("This exception kills the process");
return input;
})
.Subscribe(s => Console.WriteLine($"SelectMany (TPL Dataflow): {s}"));
await Task.Delay(TimeSpan.FromMinutes(10)).ConfigureAwait(false);
}
}
public static class ObservableExtension
{
public static IObservable<TResult> SelectManyPreserveOrder<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector, int maxParallelBatches = 1)
{
return source.FromTplDataflow(() =>
new TransformBlock<TSource, TResult>(selector,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelBatches }));
}
public static IObservable<TResult> FromTplDataflow<T, TResult>(
this IObservable<T> source, Func<IPropagatorBlock<T, TResult>> blockFactory)
{
return Observable.Defer(() =>
{
var block = blockFactory();
return Observable.Using(() =>
{
var disposable = source.Subscribe(block.AsObserver());
return Disposable.Create(dispose: () => disposable.Dispose());
}, r => block.AsObservable());
});
}
}
public static class Helper
{
public static IObservable<string> ConsoleInput()
{
return
Observable
.FromAsync(() => Console.In.ReadLineAsync())
.Repeat()
.Publish()
.RefCount()
.SubscribeOn(Scheduler.Default);
}
}
Interestingly enough the UnobservedTaskException handler is never called.
The exception is being thrown here, but it's being thrown in an unobserved Task continuation. In .NET 4.5 and above unobserved task exceptions will be handled by the runtime automatically. Here's a good article by Stephen Toub talking about that change.
The important bit:
To make it easier for developers to write asynchronous code based on Tasks, .NET 4.5 changes the default exception behavior for unobserved exceptions. While unobserved exceptions will still cause the UnobservedTaskException event to be raised (not doing so would be a breaking change), the process will not crash by default. Rather, the exception will end up getting eaten after the event is raised, regardless of whether an event handler observes the exception.