Search code examples
c#ienumerablesystem.reactivedisposeidisposable

C# Rx How to properly dispose of source Enumerable in created Observable


I would like to adapt an IEnumerable,IDisposable (source) into an Observable and would like to know the best way to do this and have the source.Dispose method get called upon unsubscribe.

There is an example on introtorx.com of adapting an IEnumerable, but it explicitly states that it has many shortcomings such as incorrect disposal pattern, poor concurrency model, no error handling, etc... and that the built in version handles these. But the built in version doesn't seem to call Dispose on the source IEnumerable upon unsubscription.

Ideally I'd like to use the .Publish().RefCount() pattern to have multiple subscribers on the same source and only have the source Dispose() called when they are all unsubscribed.

Here are is the code for my attempt, though it's not working.

static void FromEnumerableTest() {
    var observable = Observable.Create<int>(
        observer => {
            var source = new JunkEnumerable();
            foreach (int i in source) {
                observer.OnNext(i);
            }
            return () => {
                source.Dispose();
            };
        })
        .SubscribeOn(Scheduler.Default)
        .Do(i => Console.WriteLine("Publishing {0}", i))    // side effect to show it is running
        .Publish()
        .RefCount();

    //var observable = Observable.ToObservable(new JunkEnumerable())
    //    .SubscribeOn(Scheduler.Default)
    //    .Do(i => Console.WriteLine("Publishing {0}", i))    // side effect to show it is running
    //    .Publish()
    //    .RefCount();

    Console.WriteLine("Press any key to subscribe");
    Console.ReadKey();

    var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));
    Console.WriteLine("Press any key to unsubscribe");
    Console.ReadKey();
    subscription.Dispose();

    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
}


class JunkEnumerable : IEnumerable<int>, IDisposable {
    public void Dispose() { Console.WriteLine("JunkEnumerable.Dispose invoked"); }

    public IEnumerator<int> GetEnumerator() { return new Enumerator(); }

    IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }

    class Enumerator : IEnumerator<int> {
        private int counter = 0;
        public int Current {
            get {
                Thread.Sleep(1000);
                return counter++;
            }
        }

        object IEnumerator.Current { get { return Current; } }

        public void Dispose() { Console.WriteLine("JunkEnumerable.Enumerator.Dispose invoked"); }

        public bool MoveNext() { return true; }

        public void Reset() { counter = 0; }
    }
}

Solution

  • There are three stages in an Rx subscription-lifetime:

    1. Subscription
    2. Observation
    3. Unsubscription

    If the subscription never completes, the unsubscription code doesn't happen. After all, if you never fully subscribed, why should you need to unsubscribe? Your sample code has an infinite loop in the subscription code, so it never completes, so the unsubscription code will never happen.

    The normal way to handle an IDisposable is with Observable.Using. The normal way to handle an IEnumerable is with .ToObservable. If you're trying to introduce asynchrony to synchronous, enumerable code (like your example), you can do so as follows:

    var observable = Observable.Using(() => new JunkEnumerable(), junk => 
        Observable.Generate(junk.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(20))
    );
    

    As long as the TimeSpan is greater than 15 millis, Rx will turn it async, completing the subscription. The subsequent values are part of the observation stage, and unsubscription will fully take place.