I have an observable and I would like to subscribe this observable with an async method, however each time the exception thrown by the async method, the subscription disposed immediately even if I put the catch code in the observable definition. The pseudo code as follow to demonstrate this situation:
[Fact]
public async Task Test()
{
var observable = Observable.Create<int>(observer =>
{
try
{
Enumerable.Range(1, 10).ToList().ForEach(x =>
{
observer.OnNext(x);
});
}
catch (Exception ex)
{
// get called after the exception is thrown
_testOutputHelper.WriteLine($"The exception is catch:{ex.ToString()}");
}
return Disposable.Create(() =>
{
// also get called after exception is thrown
_testOutputHelper.WriteLine("Observable Dispose");
});
});
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
observable.Subscribe(x=>handler(x).Wait());
await Task.Delay(TimeSpan.FromSeconds(10));
}
From above code, I don`t understand why the dispose delegate get called even the exception is catch (For some reason, I have to deal with the exception inside the observable definition), Is there any way to prevent the subscription being disposed when the exception thrown from async method?
What's happening in your code is a direct consequence of you using Observable.Create
and filling the observable with this code:
Enumerable.Range(1, 10).ToList().ForEach(x =>
{
observer.OnNext(x);
});
Observable.Create
uses the current thread to create the observable, so the Enumerable.Range(1, 10).ToList().ForEach
executes immediately on the current thread and the call to OnNext
executes the handler(x).Wait()
immediately.
You'll note, though, that the exception occurs in the delegate passed to the Subscribe
. Internally there is code like this:
catch (Exception exception)
{
if (!autoDetachObserver.Fail(exception))
{
throw;
}
return autoDetachObserver;
}
That catches the exception in the subscribe, cancels the subscription - hence the "Observable Dispose"
message - and then rethrows the exception and that's where your code catches it.
Now, if you wanted to do this properly in Rx, you'd avoid Observable.Create
. It's a tempting way to create observables, but it leads to trouble.
Instead do this:
public async Task Test()
{
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
await
Observable
.Range(1, 10)
.SelectMany(i => Observable.FromAsync(() => handler(i)))
.LastOrDefaultAsync();
}
But, of course, we want to handle the exception. The simple way is like this:
public async Task Test()
{
Func<int, Task> handler = async (i) =>
{
// simulate the handler logic
await Task.Delay(TimeSpan.FromSeconds(1));
// throw the exception to test
throw new Exception($"{i}");
};
await
Observable
.Range(1, 10)
.SelectMany(i =>
Observable
.FromAsync(() => handler(i))
.Catch<Unit, Exception>(ex =>
{
Console.WriteLine($"The exception is catch:{ex.ToString()}");
return Observable.Empty<Unit>();
}))
.LastOrDefaultAsync();
}
That now outputs the 10 exception errors and completes normally.