I'm struggling to write some operator/method generating an observable that would transform an initial observable into another one but with some kind of back pressure control (like Throttle operator) having the following behavior:
Lets say the "delay" is 1 second (duration between A---B below).
From the initial observable below A---B-C-----D-E--------F--
My target would be 1---2---3---4---5------6--
I'm mainly interested in the signal that something occured on the initial observable (ie. target can be a IObservable<Unit>)
In the example above,
Here is an a unit test example checking the expected behavior
[Fact]
public void Custom_operator_has_expected_behavior_for_handling_back_pressure()
{
// Arrange
TimeSpan configuredDelay = TimeSpan.FromSeconds(1);
TestScheduler testScheduler = new TestScheduler();
IObservable<char> initial = Observable.Create<char>(
async o =>
{
// First event at time 1 tick
o.OnNext('A');
// waiting 700ms before sending second event at time 700ms + 1 tick
await testScheduler.Sleep(TimeSpan.FromSeconds(0.7));
o.OnNext('B');
// waiting 1s before sending third event at time 1s 700ms + 1 tick
await testScheduler.Sleep(TimeSpan.FromSeconds(1));
o.OnNext('C');
// waiting 1s 800ms before sending third event at time 3s 500ms + 1 tick
await testScheduler.Sleep(TimeSpan.FromSeconds(1.8));
o.OnNext('D');
return Disposable.Empty;
});
IObservable<Unit> result = initial.Throttle(configuredDelay, testScheduler).Select(_ => Unit.Default);
// Act
var testObserver = testScheduler.Start(() => result, 0, 0, TimeSpan.FromSeconds(10).Ticks);
// Assert
testObserver.Messages.Count.Should().Be(3);
testObserver.Messages.Should().SatisfyRespectively(
m =>
{
// first item on result is expected immediately
m.Time.Should().Be(1L);
},
m =>
{
// second item on result is at 1s (generated by 'B' but emitted at the end of 1s delay from previous emitted item)
m.Time.Should().Be(10000001L);
},
m =>
{
// third item on result is at 2s (generated by 'c' but emitted at the end of 1s delay from previous emitted item)
m.Time.Should().Be(20000001L);
},
m =>
{
// fourth item on result is at 3.5s generated by 'd' and emitted directly since previous item was long before the 1s delay period
m.Time.Should().Be(35000001L);
});
}
I have tried playing with Throttle, Buffer and Window(windowOpening, windowClosing) but I didn't find out how to handle generating the generation of 3.
Also, I struggled with having 6 being generated because it is not derivated from the fixed time window when the back pressure period is over
What comes closer is the suggestion made in this post : Equivalent of RxJS throttleTime in Rx.NET
It is looking like as a well-known Pace custom operator:
public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
=> source
.Select(p => Observable.Empty<T>().DelaySubscription(interval).StartWith(p))
.Concat();
I ran your observable without test scheduler and with and without Pace as following:
initial.TimeInterval().Subscribe(x => Debug.WriteLine(x));
initial.Pace(configuredDelay).TimeInterval().Subscribe(x => Debug.WriteLine(x));
And got these respectively:
A@00:00:00.0010807
B@00:00:00.7222620
C@00:00:01.0153906
D@00:00:01.8098370
A@00:00:00.0109996
B@00:00:01.0184339
C@00:00:01.0176319
D@00:00:01.4984127