Given a high-frequency observable stream of data, i want to only emit an item every XX seconds.
This is usually done in RX by using .Sample(TimeSpan.FromSeconds(XX))
However... I want the time-interval to vary based on some property on the data.
Let's say my data is:
class Position { ... public int Speed; }
If Speed is less than 100, I want to emit data every 5 seconds. If speed is hight than 100 it should be every 2 seonds.
Is that possible with off-the-shelf Sample() or do I need to build something myself?
Here is a low level implementation, utilizing the System.Reactive.Concurrency.Scheduler.SchedulePeriodic
extension method as a timer.
public static IObservable<TSource> Sample<TSource>(this IObservable<TSource> source,
Func<TSource, TimeSpan> intervalSelector, IScheduler scheduler = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (intervalSelector == null)
throw new ArgumentNullException(nameof(intervalSelector));
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<TSource>(observer =>
{
TimeSpan currentInterval = Timeout.InfiniteTimeSpan;
IDisposable timer = null;
TSource latestItem = default;
bool latestEmitted = true;
object locker = new object();
Action periodicAction = () =>
{
TSource itemToEmit;
lock (locker)
{
if (latestEmitted) return;
itemToEmit = latestItem;
latestItem = default;
latestEmitted = true;
}
observer.OnNext(itemToEmit);
};
return source.Subscribe(onNext: item =>
{
lock (locker)
{
latestItem = item;
latestEmitted = false;
}
var newInterval = intervalSelector(item);
if (newInterval != currentInterval)
{
timer?.Dispose();
timer = scheduler.SchedulePeriodic(newInterval, periodicAction);
currentInterval = newInterval;
}
}, onError: ex =>
{
timer?.Dispose();
observer.OnError(ex);
}, onCompleted: () =>
{
timer?.Dispose();
observer.OnCompleted();
});
});
}
Usage example:
observable.Sample(x => TimeSpan.FromSeconds(x.Speed < 100 ? 5.0 : 2.0));
The timer is restarted every time the intervalSelector
callback returns a different interval. In the extreme case that the interval is changed with every new item, then this custom operator will behave more like the built-in Throttle
than the built-in Sample
.
Unlike
Sample
,Throttle
's period is a sliding window. Each timeThrottle
receives a value, the window is reset. (citation)