Essentially what the subject says. I can't find any ready-made .Debounce() extension-method (similar to how Throttle() is available out-of-the-box).
Following below is a rough and probably faulty idea of how it could be implemented (based on my understanding of rxnet mentality).
I'm already aware that the following implementation is most probably not thread-safe and it also comfounds debounce with distinct which is not ideal. Some better ideas are needed to make this robust:
/// <summary>
/// Emits an element from the source observable sequence if and only if there are no other elements emitted during a given period of time specified by debounceTime.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence to debounce.</param>
/// <param name="debounceTime">Debouncing duration for each element.</param>
/// <param name="customComparer">The customComparer to use to compare each next element with the previous one</param>
/// <returns>The debounced sequence.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="debounceTime"/> is less than TimeSpan.Zero.</exception>
/// <remarks>
/// <para>
/// This operator debounces the source sequence by holding on to each element for the duration specified in <paramref name="debounceTime"/>. If another
/// element is produced within this time window, the previous element is dropped and a new timer is started for the current element to try and debounce it,
/// thus restarting this whole process. For streams that never have gaps larger than or equal to <paramref name="debounceTime"/> between elements, the resulting
/// stream won't produce any elements at all. In order to reduce the volume of a stream whilst guaranteeing the periodic production of elements, consider using the
/// Observable.Sample set of operators.
/// </para>
/// </remarks>
static public IObservable<TSource> DebounceX<TSource>(this IObservable<TSource> source, TimeSpan debounceTime, IEqualityComparer<TSource> customComparer = null)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (debounceTime.TotalMilliseconds <= 0) throw new ArgumentOutOfRangeException(nameof(debounceTime));
var debouncedElementsStream = Observable.Create<TSource>(Subscribe);
return debouncedElementsStream;
IDisposable Subscribe(IObserver<TSource> observer)
{
//substream of stable elements
var timer = (Timer)null;
var previousElement = default(TSource);
var comparer = customComparer ?? EqualityComparer<TSource>.Default;
return source.Subscribe(onNext: OnEachElementEmission_, onError: OnError_, onCompleted: OnCompleted_);
void OnEachElementEmission_(TSource value)
{
if (timer != null && comparer.Equals(previousElement, value)) return;
previousElement = value;
timer?.Dispose();
timer = new Timer(
state: null,
period: Timeout.InfiniteTimeSpan, // we only want the timer to fire once
dueTime: debounceTime, // after the debounce time has passed
callback: _ => OnElementHasRemainedStableForTheSpecificPeriod(value)
);
}
void OnError_(Exception exception)
{
timer?.Dispose();
timer = null;
observer.OnError(exception);
}
void OnCompleted_()
{
timer?.Dispose();
timer = null;
observer.OnCompleted();
}
void OnElementHasRemainedStableForTheSpecificPeriod(TSource value)
{
timer?.Dispose();
timer = null;
observer.OnNext(value); //00
//00 OnNext(value) essentially emits in the stream the debounced element that has remained stable for the whole period of time specified by debounceTime
}
}
}
Check this example pls, it is debouncing without checking for duplicates:
static async Task Main()
{
IObservable<T> Debounce<T>(IObservable<T> source, TimeSpan delay)
{
return source
.Do(x => TimestampedPrint($"Source: {x}"))
.Select(x => Observable.Return(x).Delay(delay))
.Switch();
}
void TimestampedPrint(object o) => Console.WriteLine($"{DateTime.Now:HH.mm.ss.fff}: {o}");
Subject<int> subject = new Subject<int>();
Debounce(subject, TimeSpan.FromSeconds(1))
.Subscribe(x => TimestampedPrint($"Result: {x}"));
subject.OnNext(1);
Thread.Sleep(1100);
subject.OnNext(2);
Thread.Sleep(1100);
subject.OnNext(3);
Thread.Sleep(500);
subject.OnNext(4);
Thread.Sleep(500);
subject.OnNext(5);
Console.ReadLine();
}
Resulting:
23.24.58.633: Started
23.24.58.676: Source: 1
23.24.59.708: Result: 1
23.24.59.783: Source: 2
23.25.00.785: Result: 2
23.25.00.884: Source: 3
23.25.01.386: Source: 4
23.25.01.888: Source: 5
23.25.02.888: Result: 5
The extension WITH equality comparer, if I got your idea right:
IObservable<T> Debounce<T>(IObservable<T> source, TimeSpan delay, IEqualityComparer<T> comparer = null)
{
return source
.Do(x => TimestampedPrint($"Source: {x}"))
.DistinctUntilChanged(comparer ?? EqualityComparer<T>.Default)
.Select(x => Observable.Return(x).Delay(delay))
.Switch();
}
This case gives me the following output, when publishing only "1"s:
23.26.30.288: Started
23.26.30.346: Source: 1
23.26.31.376: Result: 1
23.26.31.455: Source: 1
23.26.32.557: Source: 1
23.26.33.057: Source: 1
23.26.33.558: Source: 1