Search code examples
c#system.reactive

Conditional delay+throttle operator


I'm writing a custom RX operator that combines features from both Throttle and Delay, with the following signature

public static IObservable<T> DelayWhen(this IObservable<T> self, TimeSpan delay, Func<T, bool> condition);

The rules are as follows:

  1. If condition(t) returns false, emit immediately.
  2. If condition(t) returns true, delay for delay time.
  3. If self emits a value during a delay then do the following:
    1. If condition(t) returns false, cancel/skip the value scheduled for delayed emission and emit the new value
    2. If condition(t) returns true then skip/ignore this new value (i.e. the delayed value will emit if self does not emit any more values during the delay).

As you can tell from the rules, there is some behavior reminiscent of throttling going on here.

My various attempts at solving this issue include some async approaches that just grew to complex. I really feel this should be solvable using existing operators. E.g. see https://stackoverflow.com/a/16290788/2149075, which uses Amb quite neatly and which I feel is really close to what I want to achieve.


Solution

  • The question isn't completely clear, so using the following test case as a scenario:

    Observable.Interval(TimeSpan.FromSeconds(1))
        .Take(10)
        .DelayWhen(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0)
    

    This should result in the following:

    //        T: ---1---2---3---4---5---6---7---8---9---0---1----
    // original: ---0---1---2---3---4---5---6---7---8---9
    //   delay?: ---Y---N---Y---Y---Y---N---Y---N---Y---Y
    // expected: -------1---------2-----5-------7-------------8
    //
    // 0: Delayed, but interrupted by 1, 
    // 1: Non-delayed, emit immediately
    // 2: Delayed, emit after 1.5 seconds
    // 3: Delayed, since emitted during a delay, ignored
    // 4: Delayed, but interrupted by 5.
    // 5: Non-delayed, emit immediately
    // 6: Delayed, but interrupted by 7.
    // 7: Non-delayed, emit immediately
    // 8: Delayed, but interrupted by 9
    // 9: Delayed, since emitted during a delay, ignored
    

    If that doesn't line up with the requirements, please clarify the question. @Theodore's solution gets the timing right, but emits 3 and 9, ignoring the "cancel/skip the value scheduled for delayed emission and emit the new value" clause.

    This is functionally equivalent to Theodore's code, but (IMO) easier to work with and understand:

    public static IObservable<T> DelayWhen2<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
    {
        return source
            .Select(x => (Item: x, WithDelay: condition(x)))
            .Publish(published => published
                .SelectMany(t => t.WithDelay 
                    ? Observable.Return(t)
                        .Delay(delay, scheduler)
                        .TakeUntil(published.Where(t2 => !t2.WithDelay))
                    : Observable.Return(t)
                )
            )
            .Select(e => e.Item);
    }
    

    From there, I had to embed the state of whether or not you're in delay with .Scan:

    public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition)
    {
        return DelayWhen3(source, delay, condition, Scheduler.Default);
    }
    
    public static IObservable<T> DelayWhen3<T>(this IObservable<T> source, TimeSpan delay, Func<T, bool> condition, IScheduler scheduler)
    {
        return source
            .Select(x => (Item: x, WithDelay: condition(x)))
            .Publish(published => published
                .Timestamp(scheduler)
                .Scan((delayOverTime: DateTimeOffset.MinValue, output: Observable.Empty<T>()), (state, t) => {
                    if(!t.Value.WithDelay)  
                        //value isn't delayed, current delay status irrelevant, emit immediately, and cancel previous delay.
                        return (DateTimeOffset.MinValue, Observable.Return(t.Value.Item));
                    else
                        if (state.delayOverTime > t.Timestamp)
                            //value should be delayed, but current delay already in progress. Ignore value.
                            return (state.delayOverTime, Observable.Empty<T>());
                        else
                            //value should be delayed, no delay in progress. Set delay state, and return delayed observable.
                            return (t.Timestamp + delay, Observable.Return(t.Value.Item).Delay(delay, scheduler).TakeUntil(published.Where(t2 => !t2.WithDelay)));
                })
            )
            .SelectMany(t => t.output);
    }
    

    In the .Scan operator, you embed the time when the previous Delay expires. That way you know can handle a value that should be delayed within an existing delay. I added scheduler parameters to the time-sensitive functions to enable testing:

    var ts = new TestScheduler();
    
    var target = Observable.Interval(TimeSpan.FromSeconds(1), ts)
        .Take(10)
        .DelayWhen3(TimeSpan.FromSeconds(1.5), i => i % 3 == 0 || i % 2 == 0, ts);
    
    var observer = ts.CreateObserver<long>();
    target.Subscribe(observer);
    ts.Start();
    
    var expected = new List<Recorded<Notification<long>>> {
        new Recorded<Notification<long>>(2000.MsTicks(), Notification.CreateOnNext<long>(1)),
        new Recorded<Notification<long>>(4500.MsTicks(), Notification.CreateOnNext<long>(2)),
        new Recorded<Notification<long>>(6000.MsTicks(), Notification.CreateOnNext<long>(5)),
        new Recorded<Notification<long>>(8000.MsTicks(), Notification.CreateOnNext<long>(7)),
        new Recorded<Notification<long>>(10500.MsTicks(), Notification.CreateOnNext<long>(8)),
        new Recorded<Notification<long>>(10500.MsTicks() + 1, Notification.CreateOnCompleted<long>()),
    };
    
    ReactiveAssert.AreElementsEqual(expected, observer.Messages);
    

    And code for MsTicks:

    public static long MsTicks(this int i)
    {
        return TimeSpan.FromMilliseconds(i).Ticks;
    }