I need to cancel a debounced Rx event A
if a different event B
has occured. A
contrived example: ignore a debounced keyboard keystroke if a mouse button was clicked meanwhile.
Below I simulate events A
and B
via timer delays. A
is debounced using Rx.NET Throttle
operator:
var subjA = new Subject<int>();
var subjB = new Subject<Unit>();
// desired output: 3 (because B occcurs at 150ms timeline)
// actual output: 2, 3
subjA
.Throttle(TimeSpan.FromMilliseconds(200)).
.Subscribe(s => Console.WriteLine(s));
await Task.WhenAll(EmitA(), EmitB(), Task.Delay(2000));
async Task EmitA()
{
subjA!.OnNext(1);
await Task.Delay(100);
subjA!.OnNext(2);
await Task.Delay(500);
subjA!.OnNext(3);
}
async Task EmitB()
{
await Task.Delay(150);
subjB!.OnNext(Unit.Default);
}
I can solve this by giving up Throttle
and using Select
/Delay
/TakeUntil
/Switch
, try the fiddle:
#nullable enable
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using System;
var subjA = new Subject<int>();
var subjB = new Subject<Unit>();
subjA.Select(n =>
Observable.Return(n).Delay(TimeSpan.FromMilliseconds(200))
.TakeUntil(subjB))
.Switch()
.Subscribe(s => Console.WriteLine(s));
await Task.WhenAll(EmitA(), EmitB(), Task.Delay(2000));
async Task EmitA()
{
subjA!.OnNext(1);
await Task.Delay(100);
subjA!.OnNext(2);
await Task.Delay(500);
subjA!.OnNext(3);
}
async Task EmitB()
{
await Task.Delay(150);
subjB!.OnNext(Unit.Default);
}
Though, this feels like a complicated approach to what must be a common Rx scenario. Is there an elegant way of solving this?
Marble diagram:
subjA: +---1---2---------------3---------|
subjB: +----------U-------------------------------|
Result: +-----------------------------3---|
I feel like this might be what you're looking for:
IObservable<int> query =
Observable
.Merge(
subjA.Select(a => (int?)a),
subjB.Select(b => (int?)null))
.Throttle(TimeSpan.FromMilliseconds(200.0))
.Where(x => x.HasValue)
.Select(x => x.Value);
query
.Subscribe(s => Console.WriteLine(s));
That's combining both sequences before the Throttle
and then only emitting values that come from subjA
.