Search code examples
c#system.reactive

Rx: How can I respond immediately, and throttle subsequent requests


I would like to set up an Rx subscription that can respond to an event right away, and then ignore subsequent events that happen within a specified "cooldown" period.

The out of the box Throttle/Buffer methods respond only once the timeout has elapsed, which is not quite what I need.

Here is some code that sets up the scenario, and uses a Throttle (which isn't the solution I want):

class Program
{
    static Stopwatch sw = new Stopwatch();

    static void Main(string[] args)
    {
        var subject = new Subject<int>();
        var timeout = TimeSpan.FromMilliseconds(500);

        subject
            .Throttle(timeout)
            .Subscribe(DoStuff);

        var factory = new TaskFactory();
         
        sw.Start();

        factory.StartNew(() =>
        {
            Console.WriteLine("Batch 1 (no delay)");
            subject.OnNext(1);
        });

        factory.StartNewDelayed(1000, () =>
        {
            Console.WriteLine("Batch 2 (1s delay)");
            subject.OnNext(2);
        });
 
        factory.StartNewDelayed(1300, () =>
        {
            Console.WriteLine("Batch 3 (1.3s delay)");
            subject.OnNext(3);
        });

        factory.StartNewDelayed(1600, () =>
        {
            Console.WriteLine("Batch 4 (1.6s delay)");
            subject.OnNext(4);
        });

        Console.ReadKey();
        sw.Stop();
    }

    private static void DoStuff(int i)
    {
        Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);
    }
}

The output of running this right now is:

Batch 1 (no delay)

Handling 1 at 508ms

Batch 2 (1s delay)

Batch 3 (1.3s delay)

Batch 4 (1.6s delay)

Handling 4 at 2114ms

Note that batch 2 isn't handled (which is fine!) because we wait for 500ms to elapse between requests due to the nature of throttle. Batch 3 is also not handled, (which is less alright because it happened more than 500ms from batch 2) due to its proximity to Batch 4.

What I'm looking for is something more like this:

Batch 1 (no delay)

Handling 1 at ~0ms

Batch 2 (1s delay)

Handling 2 at ~1000s

Batch 3 (1.3s delay)

Batch 4 (1.6s delay)

Handling 4 at ~1600s

Note that batch 3 wouldn't be handled in this scenario (which is fine!) because it occurs within 500ms of Batch 2.

EDIT:

Here is the implementation for the "StartNewDelayed" extension method that I use:

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <returns>A Task that will be completed after the specified duration.</returns>
public static Task StartNewDelayed(
    this TaskFactory factory, int millisecondsDelay)
{
    return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None);
}

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param>
/// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns>
public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken)
{
    // Validate arguments
    if (factory == null) throw new ArgumentNullException("factory");
    if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");

    // Create the timed task
    var tcs = new TaskCompletionSource<object>(factory.CreationOptions);
    var ctr = default(CancellationTokenRegistration);

    // Create the timer but don't start it yet.  If we start it now,
    // it might fire before ctr has been set to the right registration.
    var timer = new Timer(self =>
    {
        // Clean up both the cancellation token and the timer, and try to transition to completed
        ctr.Dispose();
        ((Timer)self).Dispose();
        tcs.TrySetResult(null);
    });

    // Register with the cancellation token.
    if (cancellationToken.CanBeCanceled)
    {
        // When cancellation occurs, cancel the timer and try to transition to cancelled.
        // There could be a race, but it's benign.
        ctr = cancellationToken.Register(() =>
        {
            timer.Dispose();
            tcs.TrySetCanceled();
        });
    }

    if (millisecondsDelay > 0)
    {
        // Start the timer and hand back the task...
        timer.Change(millisecondsDelay, Timeout.Infinite);
    }
    else
    {
        // Just complete the task, and keep execution on the current thread.
        ctr.Dispose();
        tcs.TrySetResult(null);
        timer.Dispose();
    }

    return tcs.Task;
}

Solution

  • The initial answer I posted has a flaw: namely that the Window method, when used with an Observable.Interval to denote the end of the window, sets up an infinite series of 500ms windows. What I really need is a window that starts when the first result is pumped into the subject, and ends after the 500ms.

    My sample data masked this problem because the data broke down nicely into the windows that were already going to be created. (i.e. 0-500ms, 501-1000ms, 1001-1500ms, etc.)

    Consider instead this timing:

    factory.StartNewDelayed(300,() =>
    {
        Console.WriteLine("Batch 1 (300ms delay)");
        subject.OnNext(1);
    });
    
    factory.StartNewDelayed(700, () =>
    {
        Console.WriteLine("Batch 2 (700ms delay)");
        subject.OnNext(2);
    });
    
    factory.StartNewDelayed(1300, () =>
    {
        Console.WriteLine("Batch 3 (1.3s delay)");
        subject.OnNext(3);
    });
    
    factory.StartNewDelayed(1600, () =>
    {
        Console.WriteLine("Batch 4 (1.6s delay)");
        subject.OnNext(4);
    });
    

    What I get is:

    Batch 1 (300ms delay)

    Handling 1 at 356ms

    Batch 2 (700ms delay)

    Handling 2 at 750ms

    Batch 3 (1.3s delay)

    Handling 3 at 1346ms

    Batch 4 (1.6s delay)

    Handling 4 at 1644ms

    This is because the windows begin at 0ms, 500ms, 1000ms, and 1500ms and so each Subject.OnNext fits nicely into its own window.

    What I want is:

    Batch 1 (300ms delay)

    Handling 1 at ~300ms

    Batch 2 (700ms delay)

    Batch 3 (1.3s delay)

    Handling 3 at ~1300ms

    Batch 4 (1.6s delay)

    After a lot of struggling and an hour banging on it with a co-worker, we arrived at a better solution using pure Rx and a single local variable:

    bool isCoolingDown = false;
    
    subject
        .Where(_ => !isCoolingDown)
        .Subscribe(
        i =>
        {
            DoStuff(i);
    
            isCoolingDown = true;
    
            Observable
                .Interval(cooldownInterval)
                .Take(1)
                .Subscribe(_ => isCoolingDown = false);
        });
    

    Our assumption is that calls to the subscription method are synchronized. If they are not, then a simple lock could be introduced.