Search code examples
c#.netsystem.reactivereactiverx.net

How cancel a debounced Rx event A if a different event B has occured?


I need to cancel a debounced Rx event A if a different event B has occured. A contrived example: ignore a debounced keyboard keystroke if a mouse button was clicked meanwhile.

Below I simulate events A and B via timer delays. A is debounced using Rx.NET Throttle operator:

var subjA = new Subject<int>();
var subjB = new Subject<Unit>();

// desired output: 3 (because B occcurs at 150ms timeline)
// actual output: 2, 3

subjA
    .Throttle(TimeSpan.FromMilliseconds(200)).
    .Subscribe(s => Console.WriteLine(s)); 

await Task.WhenAll(EmitA(), EmitB(), Task.Delay(2000));

async Task EmitA()
{
    subjA!.OnNext(1);
    await Task.Delay(100);
    subjA!.OnNext(2);
    await Task.Delay(500);
    subjA!.OnNext(3);
}

async Task EmitB()
{
    await Task.Delay(150);
    subjB!.OnNext(Unit.Default);
}

I can solve this by giving up Throttle and using Select/Delay/TakeUntil/Switch, try the fiddle:

#nullable enable
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using System;

var subjA = new Subject<int>();
var subjB = new Subject<Unit>();

subjA.Select(n =>
        Observable.Return(n).Delay(TimeSpan.FromMilliseconds(200))
        .TakeUntil(subjB))
    .Switch()
    .Subscribe(s => Console.WriteLine(s));

await Task.WhenAll(EmitA(), EmitB(), Task.Delay(2000));

async Task EmitA()
{
    subjA!.OnNext(1);
    await Task.Delay(100);
    subjA!.OnNext(2);
    await Task.Delay(500);
    subjA!.OnNext(3);
}

async Task EmitB()
{
    await Task.Delay(150);
    subjB!.OnNext(Unit.Default);
}

Though, this feels like a complicated approach to what must be a common Rx scenario. Is there an elegant way of solving this?


Marble diagram:

subjA:  +---1---2---------------3---------|
subjB:  +----------U-------------------------------|
Result: +-----------------------------3---|

Solution

  • I feel like this might be what you're looking for:

    IObservable<int> query =
        Observable
            .Merge(
                subjA.Select(a => (int?)a),
                subjB.Select(b => (int?)null))
            .Throttle(TimeSpan.FromMilliseconds(200.0))
            .Where(x => x.HasValue)
            .Select(x => x.Value);
        
    query
        .Subscribe(s => Console.WriteLine(s));
    

    That's combining both sequences before the Throttle and then only emitting values that come from subjA.