Search code examples
c#.netsystem.reactivereactivex

Reactive operator to set up Timeout behaviour, but only once a certain number of items are returned


I have a scenario where I have an Observable sequence of measurements coming from an instrument, which only fires a measurement event when the value has changed by a certain amount.

The underlying data source raises .NET events, not an IObservable<T> stream. I'm converting the events to IObservable<T> using Observable.FromEventPattern, which subscribes the event handler when the observable is subscribed and unsubscribes it when it's disposed, and also turns the measuring instrument on and off, so I need to make sure that I correctly manage the lifetime of the observable returned by Observable.FromEventPattern.

// Example of the FromEventPattern, with start/stop lifecycle measurement
// and logic to immediately return the current value when subscribed to
public static IObservable<Double> MeasurementObservable(this MyInstrument instrument)
{
    return Observable.Defer(
      () => Observable
        .FromEventPattern<NewMeasurementEventHandler, Double>(
            h =>
            {
                instrument.NewMeasurementEvent += h;
                instrument.StartMeasuring();
            },
            h =>
            {
                instrument.StopMeasuring();
                instrument.NewMeasurementEvent -= h;
            })
        .Select(eventPattern => eventPattern.EventArgs)
        // Return the current value immediately, then raise on events
        .Prepend(instrument.CurrentValue)
     );
}

The user of my app needs to initiate the measurement in the app, and then manually start a process that the instrument measures the results of. There's no explicit indication to my app that the process being measured has actually begun or ended, but I can tell implicitly that it's started when I start getting a significant number of change events from the instrument, and I can tell it's ended when the instrument stops raising change-of-measurement events.

So, I'm trying to write a Reactive operator which will wait until it's received at least N measurements from the IObservable (which proves that the process is actually running), and then implements Timeout logic which gracefully completes the measurement and unsubscribes the underlying source when new values stop being received (which indicates that the measured process is complete). Here is my current attempt:

public static class ObservableExtensions
{
    /// <summary>
    /// After the Observable has yielded an initial set of items, implement a timeout policy
    /// which completes the Observable if no more items are observed for a specified time.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="source"></param>
    /// <param name="initialItems">The number of items which must be yielded before implementing the timeout policy.</param>
    /// <param name="dueTime">The timeout duration which signals the completion of the stream.</param>
    /// <returns></returns>
    public static IObservable<T> CompleteWhenUpdatesStop<T>(this IObservable<T> source, Int32 initialItems, TimeSpan dueTime)
    {
        var refCounted = source.Publish().RefCount();
        return refCounted.Take(initialItems)
            .Concat(refCounted.Timeout(dueTime, Observable.Empty<T>()));
    }
}

However, this isn't working well. Although I tried to use Publish().RefCount() to ensure that the underlying FromEventPattern observable isn't unsubscribed until the timeout fires, what actually happens is that the Take() returns items, then the underlying subscription is disposed, and then a new subscription is made to the underlying Observable but now with the Timeout configured. That messes with the event lifecycle management in my current situation, but also means that if you use this operator with a cold Observable you can get the original values repeated. I don't want that either - the consumer should only see each value that was emitted by the underlying Observable exactly once.

What's the right way to implement the Timeout logic so that it only takes effect once I've collected the N initial items?


Solution

  • Looks like I can fix the lifecycle management issues by using the overload of IObservable.Publish() which takes a Func<IObservable<T>, IObservable<T>> selector argument:

        public static IObservable<T> CompleteWhenUpdatesStop<T>(this IObservable<T> source, Int32 initialItems, TimeSpan dueTime, IScheduler scheduler)
        {
            return source.Publish(published =>
                published.Take(initialItems)
                    .Concat(published.Timeout(dueTime, Observable.Empty<T>(), scheduler)));
        }
    

    This override ensures that the underlying subscription is kept alive for the entire lifetime of the observable returned by the selector.