I'm using the latest Reactive Extensions and have run into a design issue:
What should happen if I throw an exception from the delegate I pass into Subscribe?
Via source stepping, I have found:
Where
) dispose the subscription as the exception passes through them.So of course I've been finding that anywhere I pass an observable through a standard RX operator, any exception causes my events to stop right there due to the disposal. At least, unless I re-subscribe.
This is leading me to question my design. Is it a bad idea to throw exceptions from my delegates? Clearly the RX team thinks so. (Though I'd question whether silently disposing the "bad" subscription is the right way to go.)
Looking at my design, though, I don't see why it's a problem. I have some protected operations going on, I start firing some OnNext's to notify listeners (we have fully switched over from old skool .NET events to Observables), if anything goes wrong in there, it will throw up the stack until it hits a handler. In my case, the handler rolls back a transaction it's working on, which also notifies the listeners of the rollback. It's all exception-safe, works fine. At least, it works fine without the Dispose going on in the Where operator's Producer base.
Going a little further.. is it inconsistent that Subject and peers don't do this behavior? And for our own ISubjects and observable operators that we've written here, should we be doing this same dispose-on-exception behavior?
I'm looking forward to any insights!
Throw exceptions when there are problems. Catch them when you think you can deal with the problems they indicate. Language support for exception handling assumes you have a function call stack and so the exception just climbs the stack looking for a handler.
But remember when using Rx, the processing model has been turned sideways. You no longer have a deep function call stack with the source (calling code) at the top and the observer (called code) at the bottom. So you cannot rely on the language to do the right thing within your Rx stream.
If your callback throws an exception then Rx tends to capture that exception and pass it down through the OnError
channel of the observable. If you do not supply an OnError
handler when you subscribe, then Rx tends to raise the exception on a background thread, generating an unhandled exception in your application.
Rx does not notify the datasource that something downstream has had an exception. This is because the datasource is completely isolated from the data consumers. It might not be on the same process or even the same machine or written in the same language. That is why the Subject
does not do anything. The Subject
is acting as the data source in this case and doesn't really care about what observers do.
As you've noted, uncaught exceptions will cause Rx to unsubscribe your observer from the observable by default. This is the fail-fast philosophy and is the only safe assumption Rx can make. If your observer raised an exception then something must be wrong and we should not give it more data by default. Rx provides mechanisms that let you handle exceptions in ways that do not necessarily unsubscribe from the observable.
One way is to turn your exceptions into data:
// instead of:
source.Where(foo => predicateThatMightThrowException(foo)).Subscribe(foo => ..., error => ...)
// do:
source.Select(foo =>
{
try { return new { foo: foo, filter: predicateThatMightThrowException(foo), error: (Exception)null }; }
catch (Exception e) { return { foo: foo, filter: true, error: e } };
})
.Where(f => f.filter)
.Subscribe(f =>
{
if (f.error != null) { handle error }
else { handle f.foo }
});
Other ways involve using CatchException
or Retry
. The Rxx library has some paired observables with left/right channels and an Either
type that you can use to stream your good data on the "left" and your errors on the "right".