Search code examples
c#observablesystem.reactivebackpressure

RX operator to signal changes on an original observable emitting directly when possible


I'm struggling to write some operator/method generating an observable that would transform an initial observable into another one but with some kind of back pressure control (like Throttle operator) having the following behavior:

Lets say the "delay" is 1 second (duration between A---B below).

From the initial observable below A---B-C-----D-E--------F--

My target would be 1---2---3---4---5------6--

I'm mainly interested in the signal that something occured on the initial observable (ie. target can be a IObservable<Unit>)

In the example above,

  • the C item is at the origin of the 3 in the target observable, it was not emitted right away because it is too close from B that have already generated the 2
  • the D is at the origin of 4 in the target observable, it was emitted right away because the interval with previous item 3 exceed the "delay" period allowing us to handle back pressure
  • the F is at the origin of F and immediately generates the 6 because the "back pressure" delay of 5 is finished

Here is an a unit test example checking the expected behavior

[Fact]
public void Custom_operator_has_expected_behavior_for_handling_back_pressure()
{
    // Arrange
    TimeSpan configuredDelay = TimeSpan.FromSeconds(1);
    TestScheduler testScheduler = new TestScheduler();
    IObservable<char> initial = Observable.Create<char>(
        async o =>
        {
            // First event at time 1 tick
            o.OnNext('A');

            // waiting 700ms before sending second event at time 700ms + 1 tick
            await testScheduler.Sleep(TimeSpan.FromSeconds(0.7));

            o.OnNext('B');
            
            // waiting 1s before sending third event at time 1s 700ms + 1 tick
            await testScheduler.Sleep(TimeSpan.FromSeconds(1));

            o.OnNext('C');
            
            // waiting 1s 800ms before sending third event at time 3s 500ms + 1 tick
            await testScheduler.Sleep(TimeSpan.FromSeconds(1.8));

            o.OnNext('D');

            return Disposable.Empty;
        });

    IObservable<Unit> result = initial.Throttle(configuredDelay, testScheduler).Select(_ => Unit.Default);
    
    // Act
    var testObserver = testScheduler.Start(() => result, 0, 0, TimeSpan.FromSeconds(10).Ticks);

    // Assert
    testObserver.Messages.Count.Should().Be(3);
    testObserver.Messages.Should().SatisfyRespectively(
        m =>
        {
            // first item on result is expected immediately
            m.Time.Should().Be(1L);
        },
        m =>
        {
            // second item on result is at 1s (generated by 'B' but emitted at the end of 1s delay from previous emitted item)
            m.Time.Should().Be(10000001L);
        },
        m =>
        {
            // third item on result is at 2s (generated by 'c' but emitted at the end of 1s delay from previous emitted item)
            m.Time.Should().Be(20000001L);
        },
        m =>
        {
            // fourth item on result is at 3.5s generated by 'd' and emitted directly since previous item was long before the 1s delay period
            m.Time.Should().Be(35000001L);
        });
}

I have tried playing with Throttle, Buffer and Window(windowOpening, windowClosing) but I didn't find out how to handle generating the generation of 3.

Also, I struggled with having 6 being generated because it is not derivated from the fixed time window when the back pressure period is over

What comes closer is the suggestion made in this post : Equivalent of RxJS throttleTime in Rx.NET


Solution

  • It is looking like as a well-known Pace custom operator:

        public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
            => source
                .Select(p => Observable.Empty<T>().DelaySubscription(interval).StartWith(p))
                .Concat();
    

    I ran your observable without test scheduler and with and without Pace as following:

    initial.TimeInterval().Subscribe(x => Debug.WriteLine(x));
    initial.Pace(configuredDelay).TimeInterval().Subscribe(x => Debug.WriteLine(x));
    

    And got these respectively:

    A@00:00:00.0010807
    B@00:00:00.7222620
    C@00:00:01.0153906
    D@00:00:01.8098370
    
    A@00:00:00.0109996
    B@00:00:01.0184339
    C@00:00:01.0176319
    D@00:00:01.4984127