I am trying to use Rx(.net) for a project and I have an issue with how to properly dispose resources that are created during Observable.Create()
and emitted with OnNext()
. My setup looks like this (shortened to the relevant bits only, hopefully):
var obs = Observable.Create<ReactiveRunData>(async (o) =>
{
if (someCondition)
{
RunData runData = await CreateRunData(); // RunData is IDisposable, needs to be disposed
o.OnNext(runData);
}
o.OnCompleted();
return Disposable.Empty;
})
.Concat(Observable.Empty<RunData>().Delay(TimeSpan.FromSeconds(2)))
.Repeat() // Resubscribe indefinitely after source completes
.Publish().RefCount() // see http://northhorizon.net/2011/sharing-in-rx/
;
This is my implementation of an observable collection that's infinite and produces an item (of type RunData
) every 2 seconds.
Then I do the actual reactive stuff by transforming the IObservable stream using all kinds of operators:
var final = obs
.Select(runData => ...)
// lots of other operations
.Select(tuple => (tuple.runData, tuple.result));`
The final observable returns a tuple (RunData, Result)
.
When subscribing to that observable, at the end I explicitly dispose of the RunData
instances, like so:
final.Subscribe(
async (tuple) =>
{
var (runData, result) = tuple;
try
{
// ... do something with result and runData, await some stuff, ...
} catch (Exception e)
{
// error handling
} finally
{
// dispose of runData
runData.Dispose();
}
},
(Exception e) =>
{
// error handling
});
I suspect this implementation is leaky in various ways, such as when exceptions are thrown from different places, in some cases of which I believe the RunData instance won't get disposed, but is just gone, replaced by an exception travelling through the pipe.
I believe I'd also run into problems if I would add a second subscriber to my observable, right? I don't need more than one subscriber, but that also makes me question my implementation.
I feel like the whole idea of passing data that needs to be disposed by subscribers is wrong, but I couldn't come up with a better way. I tried using Observable.Using()
, but afaik that only disposes the resource when the sequence ends, which in my case is never. And I really need an infinite sequence because I want to be able to use functionality like Scan()
to reference previous data to build intermediate data structures over time.
I also tried using the callback that is returned from the lambda of Observable.Create()
but that fires as soon as Observable.Create() is done and not after the subscriber is done, so that led to race conditions (but I'm not even sure if I understood that correctly, RX + async is tricky).
So... how can I implement this properly?
For some background info, RunData
includes (among other things) a DB transaction and an Autofac LifetimeScope
, both of which I want to use throughout the pipeline, and both of which need to be disposed at the end, after the subscriber is done.
Your code has several issues that prevent it being safe to use in a production environment. I'll try to go thru them all, but I'll hit the most major design issue first.
When using IDisposable
objects in C#, you have two ways to manage lifetime.
IDisposable
must call Dispose()
,Dispose()
.That's not specifically about observables, that's whenever, anyway, and at all times, when you create disposables in C#.
Now, when an observable returns disposables then you are effectively writing a factory method, so the creator cannot call Dispose()
.
But, you have no control of how many subscribers there are for your observable, so none of the consumers (observers/subscriptions) can know when they are done with the disposable.
The bottom-line is that you CANNOT safely manage disposables if they are the output of an observable. DO NOT DO IT.
Rx does offer a safe way of dealing with disposables that allows the pipeline to effectively manage the disposables lifetime - and that's with the Using
operator. That's what you should use.
Before I give an example, I'll just cover off on the 3 other issue I see in your code.
if (someCondition)
. External state is bad in observables and you should learn to avoid it always. It's hard to tell what you're using this for in your code, so I'll ignore this issue for now.async
in the Create
call - while that's OK for create as there is a Func<IObserver<T>, Task<IDisposable>>
overload, I'm always suspicious of this kind of code as the framework converts Func<Task>
into Action
as fire-and-forget calls - and these are hard to debug. Try to avoid!Disposable.Empty
inside the Create
. Whenever you do this it is likely that you're doing something wrong. Often this creates code that you cannot cancel and/or that creates deadlocks. There are very few times that you need to resort to Create
in this way. Try to avoid!From your code in the question it appears that RunData
produces a result. I'll assume that there is a GetResult()
method on RunData
that produces a value of type Result
.
To start, since CreateRunData
is async
you should add this handy helper method:
public static class ObservableEx
{
public static IObservable<T> Using<T, R>(Func<Task<R>> resourceFactory, Func<R, IObservable<T>> observableFactory) where R : IDisposable =>
from r in Observable.FromAsync(resourceFactory)
from t in Observable.Using(() => r, observableFactory)
select t;
}
Here are the two ways that I would write your code.
(1)
IObservable<Result> obs =
ObservableEx
.Using(
() => CreateRunData(),
rd =>
Observable
.Interval(TimeSpan.FromSeconds(2.0))
.Where(t => someCondition) // don't do this
.Select(t => rd.GetResult()))
.Publish()
.RefCount();
This approach creates one instance of your resource and continues to use it until all of your subscribers have disposed of their subscriptions.
(2)
IObservable<Result> obs =
Observable
.Interval(TimeSpan.FromSeconds(2.0))
.Where(t => someCondition) // don't do this
.SelectMany(t =>
ObservableEx
.Using(
() => CreateRunData(),
rd => Observable.Return(rd.GetResult())))
.Publish()
.RefCount();
This version creates a new resource for each tick pf the Interval
.
Should GetResult
actually be async
then do these replacements:
(1)
Replace .Select(t => rd.GetResult()))
with .SelectMany(t => Observable.FromAsync(() => rd.GetResultAsync())))
.
(2)
Replace rd => Observable.Return(rd.GetResult())))
with rd => Observable.FromAsync(() => rd.GetResultAsync())))
.
Let me know if this helps.