Search code examples
c#.netasync-awaitreactive-programmingsystem.reactive

How to handle the exception thrown by the async method with observable?


I have an observable and I would like to subscribe this observable with an async method, however each time the exception thrown by the async method, the subscription disposed immediately even if I put the catch code in the observable definition. The pseudo code as follow to demonstrate this situation:

[Fact]
public async Task Test()
{
    var observable = Observable.Create<int>(observer =>
    {
        try
        {
            Enumerable.Range(1, 10).ToList().ForEach(x =>
            {
                observer.OnNext(x);
            });
        }
        catch (Exception ex)
        {
           // get called after the exception is thrown 
            _testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}"); 
        }
        return Disposable.Create(() =>
        {
           // also get called after exception is thrown
            _testOutputHelper.WriteLine("Observable Dispose"); 
        });
    });

    Func<int, Task> handler = async (i) =>
     {
         // simulate the handler logic
         await Task.Delay(TimeSpan.FromSeconds(1));
         // throw the exception to test 
         throw new Exception($"{i}");
     };

    observable.Subscribe(x=>handler(x).Wait());

    await Task.Delay(TimeSpan.FromSeconds(10));
}

From above code, I don`t understand why the dispose delegate get called even the exception is catch (For some reason, I have to deal with the exception inside the observable definition), Is there any way to prevent the subscription being disposed when the exception thrown from async method?


Solution

  • What's happening in your code is a direct consequence of you using Observable.Create and filling the observable with this code:

    Enumerable.Range(1, 10).ToList().ForEach(x =>
    {
        observer.OnNext(x);
    });
    

    Observable.Create uses the current thread to create the observable, so the Enumerable.Range(1, 10).ToList().ForEach executes immediately on the current thread and the call to OnNext executes the handler(x).Wait() immediately.

    You'll note, though, that the exception occurs in the delegate passed to the Subscribe. Internally there is code like this:

    catch (Exception exception)
    {
        if (!autoDetachObserver.Fail(exception))
        {
            throw;
        }
        return autoDetachObserver;
    }
    

    That catches the exception in the subscribe, cancels the subscription - hence the "Observable Dispose" message - and then rethrows the exception and that's where your code catches it.

    Now, if you wanted to do this properly in Rx, you'd avoid Observable.Create. It's a tempting way to create observables, but it leads to trouble.

    Instead do this:

    public async Task Test()
    {
        Func<int, Task> handler = async (i) =>
         {
             // simulate the handler logic
             await Task.Delay(TimeSpan.FromSeconds(1));
             // throw the exception to test 
             throw new Exception($"{i}");
         };
     
        await
            Observable
                .Range(1, 10)
                .SelectMany(i => Observable.FromAsync(() => handler(i)))
                .LastOrDefaultAsync();
    }
    

    But, of course, we want to handle the exception. The simple way is like this:

    public async Task Test()
    {
        Func<int, Task> handler = async (i) =>
         {
             // simulate the handler logic
             await Task.Delay(TimeSpan.FromSeconds(1));
             // throw the exception to test 
             throw new Exception($"{i}");
         };
     
        await
            Observable
                .Range(1, 10)
                .SelectMany(i =>
                    Observable
                        .FromAsync(() => handler(i))
                        .Catch<Unit, Exception>(ex =>
                        {
                            Console.WriteLine($"The exception is catch:{ex.ToString()}");
                            return Observable.Empty<Unit>();
                        }))
                .LastOrDefaultAsync();
    }
    

    That now outputs the 10 exception errors and completes normally.