Search code examples
c#.neterror-handlingreactive-programmingsystem.reactive

Exception handling in observable pipeline


I have created an observable that consists of an item being transformed to another by running an async method.

IObservable<Summary> obs = scanner.Scans
                    .SelectMany(b => GetAssignment(b))
                    .SelectMany(b => VerifyAssignment(b))
                    .SelectMany(b => ConfirmAssignmentData(b))
                    .SelectMany(b => UploadAsset(b))
                    .Select(assignment => new Summary())
                    .Catch(LogException());

I would like to make this fail-proof, so in case an exception is thrown during the processing, I should log the exception, but ignore the exception and resume with the next scan (the next item pushed by scanner.Scans)

The current code catches any exception, but the sequence finished as soon as an exception is thrown.

How can I make it "swallow" the exception (logging it), but to resume with the next item?


Solution

  • Rx is a functional paradigm so it's very useful to use a functional approach to solving this problem.

    The answer is to introduce another monad that can cope with errors, like Nullable<T> can cope with integers having a null value, but in this case a class that can either represent a value or an exception.

    public class Exceptional
    {
        public static Exceptional<T> From<T>(T value) => new Exceptional<T>(value);
        public static Exceptional<T> From<T>(Exception ex) => new Exceptional<T>(ex);
        public static Exceptional<T> From<T>(Func<T> factory) => new Exceptional<T>(factory);
    }
    
    public class Exceptional<T>
    {
        public bool HasException { get; private set; }
        public Exception Exception { get; private set; }
        public T Value { get; private set; }
    
        public Exceptional(T value)
        {
            this.HasException = false;
            this.Value = value;
        }
    
        public Exceptional(Exception exception)
        {
            this.HasException = true;
            this.Exception = exception;
        }
    
        public Exceptional(Func<T> factory)
        {
            try
            {
                this.Value = factory();
                this.HasException = false;
            }
            catch (Exception ex)
            {
                this.Exception = ex;
                this.HasException = true;
            }
        }
    
        public override string ToString() =>
            this.HasException
                ? this.Exception.GetType().Name
                : (this.Value != null ? this.Value.ToString() : "null");
    }
    
    
    public static class ExceptionalExtensions
    {
        public static Exceptional<T> ToExceptional<T>(this T value) => Exceptional.From(value);
    
        public static Exceptional<T> ToExceptional<T>(this Func<T> factory) => Exceptional.From(factory);
    
        public static Exceptional<U> Select<T, U>(this Exceptional<T> value, Func<T, U> m) =>
            value.SelectMany(t => Exceptional.From(() => m(t)));
    
        public static Exceptional<U> SelectMany<T, U>(this Exceptional<T> value, Func<T, Exceptional<U>> k) =>
            value.HasException ? Exceptional.From<U>(value.Exception) : k(value.Value);
    
        public static Exceptional<V> SelectMany<T, U, V>(this Exceptional<T> value, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
            value.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
    }
    

    So, let's start by creating an Rx query that throws an exception.

    IObservable<int> query =
        Observable
            .Range(0, 10)
            .Select(x => 5 - x)
            .Select(x => 100 / x)
            .Select(x => x + 5);
    

    If I run the observable I get this:

    Normal Query

    Let's transform this with with Exceptional and see how it allows us to continue processing when an error occurs.

    IObservable<Exceptional<int>> query =
        Observable
            .Range(0, 10)
            .Select(x => x.ToExceptional())
            .Select(x => x.Select(y => 5 - y))
            .Select(x => x.Select(y => 100 / y))
            .Select(x => x.Select(y => y + 5));
    

    Now when I run it I get this:

    Query with Exceptional

    Now I could test each result, see if HasException is true and log each exception, meanwhile the observable continues.

    Finally, it's easy to clean up the query to look almost the same as the original by introducing one further extension method.

        public static IObservable<Exceptional<U>> Select<T, U>(this IObservable<Exceptional<T>> source, Func<T, U> m) =>
            source.Select(x => x.SelectMany(y => Exceptional.From(() => m(y))));
    

    This combines observables and exceptionals into a single Select operator.

    Now the query can look like this:

    IObservable<Exceptional<int>> query =
        Observable
            .Range(0, 10)
            .Select(x => x.ToExceptional())
            .Select(x => 5 - x)
            .Select(x => 100 / x)
            .Select(x => x + 5);
    

    I get the same result at before.


    Finally, I could get this all working with query syntax by adding two more extension methods:

    public static IObservable<Exceptional<U>> SelectMany<T, U>(this IObservable<T> source, Func<T, Exceptional<U>> k) =>
        source.Select(t => k(t));
    
    public static IObservable<Exceptional<V>> SelectMany<T, U, V>(this IObservable<T> source, Func<T, Exceptional<U>> k, Func<T, U, V> m) =>
        source.SelectMany(t => k(t).SelectMany(u => Exceptional.From(() => m(t, u))));
    

    This allows:

    IObservable<Exceptional<int>> query =
        from n in Observable.Range(0, 10)
        from x in n.ToExceptional()
        let a = 5 - x
        let b = 100 / a
        select b + 5;
    

    Again, I get the same results as before.