Search code examples
c#.netobservablesystem.reactive

Rx.Net - disposing of resources created during Observable.Create()


I am trying to use Rx(.net) for a project and I have an issue with how to properly dispose resources that are created during Observable.Create() and emitted with OnNext(). My setup looks like this (shortened to the relevant bits only, hopefully):

var obs = Observable.Create<ReactiveRunData>(async (o) =>
  {
    if (someCondition)
    {
      RunData runData = await CreateRunData(); // RunData is IDisposable, needs to be disposed
      o.OnNext(runData);
    }

    o.OnCompleted();

    return Disposable.Empty;
  })
  .Concat(Observable.Empty<RunData>().Delay(TimeSpan.FromSeconds(2)))
  .Repeat() // Resubscribe indefinitely after source completes
  .Publish().RefCount() // see http://northhorizon.net/2011/sharing-in-rx/
  ;

This is my implementation of an observable collection that's infinite and produces an item (of type RunData) every 2 seconds.

Then I do the actual reactive stuff by transforming the IObservable stream using all kinds of operators:

var final = obs
    .Select(runData => ...)
    // lots of other operations
    .Select(tuple => (tuple.runData, tuple.result));`

The final observable returns a tuple (RunData, Result).

When subscribing to that observable, at the end I explicitly dispose of the RunData instances, like so:

final.Subscribe(
    async (tuple) =>
    {
        var (runData, result) = tuple;
        try
        {
            // ... do something with result and runData, await some stuff, ...
            
        } catch (Exception e)
        {
            // error handling
        } finally 
        {
            // dispose of runData
            runData.Dispose();
        }
    },
    (Exception e) =>
    {
        // error handling
    });

I suspect this implementation is leaky in various ways, such as when exceptions are thrown from different places, in some cases of which I believe the RunData instance won't get disposed, but is just gone, replaced by an exception travelling through the pipe.

I believe I'd also run into problems if I would add a second subscriber to my observable, right? I don't need more than one subscriber, but that also makes me question my implementation.

I feel like the whole idea of passing data that needs to be disposed by subscribers is wrong, but I couldn't come up with a better way. I tried using Observable.Using(), but afaik that only disposes the resource when the sequence ends, which in my case is never. And I really need an infinite sequence because I want to be able to use functionality like Scan() to reference previous data to build intermediate data structures over time.

I also tried using the callback that is returned from the lambda of Observable.Create() but that fires as soon as Observable.Create() is done and not after the subscriber is done, so that led to race conditions (but I'm not even sure if I understood that correctly, RX + async is tricky).

So... how can I implement this properly?

For some background info, RunData includes (among other things) a DB transaction and an Autofac LifetimeScope, both of which I want to use throughout the pipeline, and both of which need to be disposed at the end, after the subscriber is done.


Solution

  • Your code has several issues that prevent it being safe to use in a production environment. I'll try to go thru them all, but I'll hit the most major design issue first.

    When using IDisposable objects in C#, you have two ways to manage lifetime.

    1. Either the creator of the IDisposable must call Dispose(),
    2. Or, you're writing a factory method, so the consumer must call Dispose().

    That's not specifically about observables, that's whenever, anyway, and at all times, when you create disposables in C#.

    Now, when an observable returns disposables then you are effectively writing a factory method, so the creator cannot call Dispose().

    But, you have no control of how many subscribers there are for your observable, so none of the consumers (observers/subscriptions) can know when they are done with the disposable.

    The bottom-line is that you CANNOT safely manage disposables if they are the output of an observable. DO NOT DO IT.

    Rx does offer a safe way of dealing with disposables that allows the pipeline to effectively manage the disposables lifetime - and that's with the Using operator. That's what you should use.

    Before I give an example, I'll just cover off on the 3 other issue I see in your code.

    1. You use external state with your if (someCondition). External state is bad in observables and you should learn to avoid it always. It's hard to tell what you're using this for in your code, so I'll ignore this issue for now.
    2. You're using a async in the Create call - while that's OK for create as there is a Func<IObserver<T>, Task<IDisposable>> overload, I'm always suspicious of this kind of code as the framework converts Func<Task> into Action as fire-and-forget calls - and these are hard to debug. Try to avoid!
    3. You're returning Disposable.Empty inside the Create. Whenever you do this it is likely that you're doing something wrong. Often this creates code that you cannot cancel and/or that creates deadlocks. There are very few times that you need to resort to Create in this way. Try to avoid!

    From your code in the question it appears that RunData produces a result. I'll assume that there is a GetResult() method on RunData that produces a value of type Result.

    To start, since CreateRunData is async you should add this handy helper method:

    public static class ObservableEx
    {
        public static IObservable<T> Using<T, R>(Func<Task<R>> resourceFactory, Func<R, IObservable<T>> observableFactory) where R : IDisposable =>
            from r in Observable.FromAsync(resourceFactory)
            from t in Observable.Using(() => r, observableFactory)
            select t;
    }
    

    Here are the two ways that I would write your code.

    (1)

    IObservable<Result> obs =
        ObservableEx
            .Using(
                () => CreateRunData(),
                rd =>
                    Observable
                        .Interval(TimeSpan.FromSeconds(2.0))
                        .Where(t => someCondition) // don't do this
                        .Select(t => rd.GetResult()))
            .Publish()
            .RefCount();
    

    This approach creates one instance of your resource and continues to use it until all of your subscribers have disposed of their subscriptions.

    (2)

    IObservable<Result> obs =
        Observable
            .Interval(TimeSpan.FromSeconds(2.0))
            .Where(t => someCondition) // don't do this
            .SelectMany(t =>
                ObservableEx
                    .Using(
                        () => CreateRunData(),
                        rd => Observable.Return(rd.GetResult())))
            .Publish()
            .RefCount();
    

    This version creates a new resource for each tick pf the Interval.

    Should GetResult actually be async then do these replacements:

    (1)

    Replace .Select(t => rd.GetResult())) with .SelectMany(t => Observable.FromAsync(() => rd.GetResultAsync()))).

    (2)

    Replace rd => Observable.Return(rd.GetResult()))) with rd => Observable.FromAsync(() => rd.GetResultAsync()))).

    Let me know if this helps.