I'm struggling to write some operator/method generating an observable that would transform an initial observable into another one but with some kind of back pressure control (like Throttle operator) having the following behavior:
Lets say the "delay" is 1 second (duration between A---B below).
From the initial observable below A---B-C-----D-E--------F--
My target would be 1---2---3---4---5------6--
I'm mainly interested in the signal that something occured on the initial observable (ie. target can be a IObservable<Unit>)
In the example above,
Here is an a unit test example checking the expected behavior
public void Custom_operator_has_expected_behavior_for_handling_back_pressure()
// Arrange
TimeSpan configuredDelay = TimeSpan.FromSeconds(1);
TestScheduler testScheduler = new TestScheduler();
IObservable<char> initial = Observable.Create<char>(
async o =>
// First event at time 1 tick
// waiting 700ms before sending second event at time 700ms + 1 tick
await testScheduler.Sleep(TimeSpan.FromSeconds(0.7));
// waiting 1s before sending third event at time 1s 700ms + 1 tick
await testScheduler.Sleep(TimeSpan.FromSeconds(1));
// waiting 1s 800ms before sending third event at time 3s 500ms + 1 tick
await testScheduler.Sleep(TimeSpan.FromSeconds(1.8));
return Disposable.Empty;
IObservable<Unit> result = initial.Throttle(configuredDelay, testScheduler).Select(_ => Unit.Default);
// Act
var testObserver = testScheduler.Start(() => result, 0, 0, TimeSpan.FromSeconds(10).Ticks);
// Assert
m =>
// first item on result is expected immediately
m =>
// second item on result is at 1s (generated by 'B' but emitted at the end of 1s delay from previous emitted item)
m =>
// third item on result is at 2s (generated by 'c' but emitted at the end of 1s delay from previous emitted item)
m =>
// fourth item on result is at 3.5s generated by 'd' and emitted directly since previous item was long before the 1s delay period
I have tried playing with Throttle, Buffer and Window(windowOpening, windowClosing) but I didn't find out how to handle generating the generation of 3.
Also, I struggled with having 6 being generated because it is not derivated from the fixed time window when the back pressure period is over
What comes closer is the suggestion made in this post : Equivalent of RxJS throttleTime in Rx.NET
It is looking like as a well-known Pace custom operator:
public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
=> source
.Select(p => Observable.Empty<T>().DelaySubscription(interval).StartWith(p))
I ran your observable without test scheduler and with and without Pace as following:
initial.TimeInterval().Subscribe(x => Debug.WriteLine(x));
initial.Pace(configuredDelay).TimeInterval().Subscribe(x => Debug.WriteLine(x));
And got these respectively: