Search code examples
c#system.reactiverx.net

.NET ReactiveExtensions: Use Sample() with variable timespan


Given a high-frequency observable stream of data, i want to only emit an item every XX seconds.

This is usually done in RX by using .Sample(TimeSpan.FromSeconds(XX))

However... I want the time-interval to vary based on some property on the data.

Let's say my data is:

class Position { ... public int Speed; }

If Speed is less than 100, I want to emit data every 5 seconds. If speed is hight than 100 it should be every 2 seonds.

Is that possible with off-the-shelf Sample() or do I need to build something myself?


Solution

  • Here is a low level implementation, utilizing the System.Reactive.Concurrency.Scheduler.SchedulePeriodic extension method as a timer.

    public static IObservable<TSource> Sample<TSource>(this IObservable<TSource> source,
        Func<TSource, TimeSpan> intervalSelector, IScheduler scheduler = null)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (intervalSelector == null)
            throw new ArgumentNullException(nameof(intervalSelector));
        scheduler = scheduler ?? Scheduler.Default;
    
        return Observable.Create<TSource>(observer =>
        {
            TimeSpan currentInterval = Timeout.InfiniteTimeSpan;
            IDisposable timer = null;
            TSource latestItem = default;
            bool latestEmitted = true;
            object locker = new object();
    
            Action periodicAction = () =>
            {
                TSource itemToEmit;
                lock (locker)
                {
                    if (latestEmitted) return;
                    itemToEmit = latestItem;
                    latestItem = default;
                    latestEmitted = true;
                }
                observer.OnNext(itemToEmit);
            };
    
            return source.Subscribe(onNext: item =>
            {
                lock (locker)
                {
                    latestItem = item;
                    latestEmitted = false;
                }
                var newInterval = intervalSelector(item);
                if (newInterval != currentInterval)
                {
                    timer?.Dispose();
                    timer = scheduler.SchedulePeriodic(newInterval, periodicAction);
                    currentInterval = newInterval;
                }
            }, onError: ex =>
            {
                timer?.Dispose();
                observer.OnError(ex);
            }, onCompleted: () =>
            {
                timer?.Dispose();
                observer.OnCompleted();
            });
        });
    }
    

    Usage example:

    observable.Sample(x => TimeSpan.FromSeconds(x.Speed < 100 ? 5.0 : 2.0));
    

    The timer is restarted every time the intervalSelector callback returns a different interval. In the extreme case that the interval is changed with every new item, then this custom operator will behave more like the built-in Throttle than the built-in Sample.

    Unlike Sample, Throttle's period is a sliding window. Each time Throttle receives a value, the window is reset. (citation)