Search code examples
c#system.reactiveidisposable.net-5rx.net

Observable timers disposing


I'm using the Reactive .NET extensions and I wonder about its disposal. I know in some cases it's good to dispose it like that: .TakeUntil(Observable.Timer(TimeSpan.FromMinutes(x))). I

First case

In this case, I have a timer that triggers after x seconds and then it completes and should be disposed.

public void ScheduleOrderCancellationIfNotFilled(string pair, long orderId, int waitSecondsBeforeCancel)
{
    Observable.Timer(TimeSpan.FromSeconds(waitSecondsBeforeCancel))
        .Do(e =>
        {
            var result = _client.Spot.Order.GetOrder(pair, orderId);

            if (result.Success)
            {
                if (result.Data?.Status != OrderStatus.Filled)
                {
                    _client.Spot.Order.CancelOrder(pair, orderId);
                }
            }
        })
        .Subscribe();
}

Second case

In this case, the timer runs on the first second and then it repeats itself on each 29 minutes. This should live until its defining class is disposed. I believe this one should be disposed with IDisposable implementation. How?

var keepAliveListenKey = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(29))
    .Do(async e =>
    {
        await KeepAliveListenKeyAsync().ConfigureAwait(false);
    })
    .Subscribe();

Edit

I also want it to be using a Subject<T> which makes it easier to dispose and to reset the subscription.

For ex. Reset and Dispose observable subscriber, Reactive Extensions (@Enigmativity)

public class UploadDicomSet : ImportBaseSet
{
    IDisposable subscription;
    Subject<IObservable<long>> subject = new Subject<IObservable<long>>();

    public UploadDicomSet()
    {
        subscription = subject.Switch().Subscribe(s => CheckUploadSetList(s));
        subject.OnNext(Observable.Interval(TimeSpan.FromMinutes(2)));
    }

    void CheckUploadSetList(long interval)
    {
        subject.OnNext(Observable.Never<long>());
        // Do other things
    }

    public void AddDicomFile(SharedLib.DicomFile dicomFile)
    {
        subject.OnNext(Observable.Interval(TimeSpan.FromMinutes(2)));
        // Reset the subscription to go off in 2 minutes from now
        // Do other things
    }
}

Solution

  • In the first case it gonna be disposed automatically. It is, actually, a common way to achieve automatic subscription management and that's definitely nice and elegant way to deal with rx.

    In the second case you have over-engineered. Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)) is itself sufficient to generate a sequence of ascending longs over time. Since this stream is endless by its nature, you right - explicit subscription management is required. So it is enough to have:

    var sub = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Subscribe()

    ...and sub.Dispose() it later.

    P.S. Note that in your code you .Do async/await. Most probably that is not what you want. You want SelectMany to ensure that async operation is properly awaited and exceptions handled.


    Answering your questions in the comments section:

    What about disposing using Subject instead?

    Well, nothing so special about it. Both IObserver<>, IObservable<> is implemented by this class such that it resembles classical .NET events (list of callbacks to be called upon some event). It does not differ in any sense with respect to your question and use-case.

    May you give an example about the .Do with exception handling?

    Sure. The idea is that you want translate your async/await encapsulated into some Task<T> to IObservable<T> such that is preserves both cancellation and error signals. For that .SelectMany method must be used (like SelectMany from LINQ, the same idea). So just change your .Do to .SelectMany.

    Observable
        .Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))
        .SelectMany(_ => Observable.FromAsync(() => /* that's the point where your Task<> becomes Observable */ myTask))
    

    I'm confused again. Do I need IObservable<IObservable> (Select) or IObservable (SelectMany)

    Most probably, you don't need switch. Why? Because it was created mainly to avoid IO race conditions, such that whenever new event is emitted, the current one (which might be in progress due to natural parallelism or asynchronous workflow) is guaranteed to be cancelled (i.e. unsubscribed). Otherwise race conditions can (and will) damage your state.

    SelectMany, on the contrary, will make sure all of them are happen sequentially, in some total order they have indeed arrived. Nothing will be cancelled. You will finish (await, if you wish) current callback and then trigger the next one. Of course, such behavior can be altered by means of appropriate IScheduler, but that is another story.