Search code examples
c#system.reactive

How to throttle events RX?


I am trying to throttle an event being triggered by a slider when the value has been changed I would like to throttle to 1 seconds, its Print every second but the data being printed is incorrect all value in the collection is being printed at the same time but I would like to print the last value of that Unique data type every second, result posted below.

[Fully reproducible code] XAML:

<Grid>
    <TextBox x:Name="txtbilly" Text="{Binding Position, Mode=TwoWay , UpdateSourceTrigger=PropertyChanged}" Margin="0,0,0,382" ></TextBox>
    <TextBox x:Name="txtbob" Text="{Binding Position, Mode=TwoWay , UpdateSourceTrigger=PropertyChanged}" Margin="0,162,0,233" ></TextBox>
    <Slider x:Name="slbilly"  Value="{Binding Position, Mode=TwoWay , UpdateSourceTrigger=PropertyChanged}" Margin="0,57,0,306" GotMouseCapture="slbilly_GotMouseCapture"/>
    <Slider x:Name="slbob"  Value="{Binding Position, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}" Margin="0,206,0,171" GotMouseCapture="slbob_GotMouseCapture" />
</Grid>

Code behind:

public partial class MainWindow : Window
{
    Student billy = new Student("Unique1", 0.0f);
    Student Bob = new Student("Unique2", 0.0f);
    public MainWindow()
    {
        InitializeComponent();
        txtbilly.DataContext = billy;
        slbilly.DataContext = billy;
        billy.PropertyChanged += Students_PropertyChanged;

        txtbob.DataContext = Bob;
        slbob.DataContext = Bob;
        Bob.PropertyChanged += Students_PropertyChanged;

    }
    static IList<Student> studentsList = new List<Student>();
    IObservable<Student> observable;
    private void Students_PropertyChanged(object sender, System.ComponentModel.PropertyChangedEventArgs e)
    {
        var student = (Student)sender;
        studentsList.Add(student);
        Debug.Print("List COunt ==>>>" + studentsList.Count.ToString() + "\n");

        observable = studentsList.ToObservable(Scheduler.Default);

        observable.Throttle(TimeSpan.FromSeconds(1), Scheduler.Default);
    }

    private void slbilly_GotMouseCapture(object sender, MouseEventArgs e)
    {
        observable?.Subscribe(i => Debug.Print("{0}\nPos {1}\n Time Received {2}\nOBS Count{3}", i?.ID?.ToString(), i.Position.ToString(), DateTime.Now.ToString(), observable.Count<Student>().ToString()));
    }


    private void slbob_GotMouseCapture(object sender, MouseEventArgs e)
    {

        observable?.Subscribe(i => Debug.Print("{0}\nPos {1}\n Time Received {2}\nOBS Count{3}", i?.ID?.ToString(), i.Position.ToString(), DateTime.Now.ToString(), observable.Count<Student>().ToString()));
    }
}

Model:

public class Student : INotifyPropertyChanged
{
    private string _ID;
    public string ID
    {
        get
        {
            return _ID;
        }
        set
        {
            _ID = value;
            OnPropertyChanged(new PropertyChangedEventArgs("ID"));
        }
    }
    private float _Position;
    public float Position
    {
        get
        {
            return _Position;
        }
        set
        {
            _Position = value;
            OnPropertyChanged(new PropertyChangedEventArgs("Position"));
        }
    }
    public Student(string id, float position)
    {

        this.Position = position;
        this.ID = id;

    }

    public event PropertyChangedEventHandler PropertyChanged;
    public void OnPropertyChanged(PropertyChangedEventArgs e)
    {
        if (PropertyChanged != null)
        {
            PropertyChanged(this, e);
        }
    }
}

Result:

  1. Unique1 Pos 0.06468305 Time Received 18/04/2022 00:16:05

  2. Unique1 Pos 0.06468305 Time Received 18/04/2022 00:16:05 OBS

  3. Unique1 Pos 0.06468305 Time Received 18/04/2022 00:16:05 OBS

  4. Unique1 Pos 0.06468305 Time Received 18/04/2022 00:16:05 OBS

  5. Unique1 Pos 0.06468305 Time Received 18/04/2022 00:16:05 OBS

Expected: If the user starts at 0.0 and drag from 0.0 to 300.98 in 1 seconds, then to 200.45 by 2 seconds:

Unique1 Pos 0.0 Time Received 18/04/2022 00:16:05

Unique1 Pos 300.98 Time Received 18/04/2022 00:16:06

Unique1 Pos 200.45 Time Received 18/04/2022 00:16:07

Issues:

  1. studentsList counts keeps growing and not reset by the observable(Every time the use release the mouse from the control it should reset the list.

  2. InvalidOperationException: 'Collection was modified; enumeration operation may not execute.'

  3. The Debug.Print only gets printed/execute once I release the mouse from the drag motion

@Enigmativity answer sorted my forth known issue.

I am new to the Rx technology so I am not sure if I am implementing it in the correct manner.

Basically if the Value changed is coming from the same control I would like to throttle it, e.g. on the slider control when the user click the knob to drag start throttling once the user releases the mouse click stop throttling


Solution

  • In an attempt to show you what you probably need to get your observable to work, it should look something like this:

    var observable =
        Observable
            .Merge(
                Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(h => billy.PropertyChanged += h, h => billy.PropertyChanged += h).Select(x => billy),
                Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(h => Bob.PropertyChanged += h, h => Bob.PropertyChanged += h).Select(x => Bob))
            .Throttle(TimeSpan.FromSeconds(5.0))
            .ObserveOn(SynchronizationContext.Current)
            .Do(s =>
            {
                studentsList.Add(s);
                Debug.Print($"List Count ==>>>{studentsList.Count}{Environment.NewLine}");
            })
            .Select(s => Observable.FromAsync(() => ExpensiveHttpCall(s)).Select(u => new { Student = s, Result = u }))
            .Switch();
    

    And the subscription, like this:

    _subscription =
        observable
        .ObserveOn(SynchronizationContext.Current)
            .Subscribe(x =>
            {
                Debug.Print($"{x.Student.ID}{Environment.NewLine}Pos {x.Student.Position}{Environment.NewLine}Time Received {DateTime.Now.ToString()}{Environment.NewLine}List Count{studentsList.Count()}");
            });
    

    I added this code to simulate a long-running HTTP call:

        private async Task<Unit> ExpensiveHttpCall(Student student)
        {
            await Task.Delay(TimeSpan.FromSeconds(5.0));
            return Unit.Default;
        }
    

    Now I don't know exactly what you're trying to do with this query, but this should help to move you in the right direction.

    Key to Rx is to try to get your query down to one single query that you subscribe to once. It makes life much easier.

    Here's the complete code:

    public partial class MainWindow : Window
    {
        Student billy = new Student("Unique1", 0.0f);
        Student Bob = new Student("Unique2", 0.0f);
        public MainWindow()
        {
            InitializeComponent();
            txtbilly.DataContext = billy;
            slbilly.DataContext = billy;
    
            txtbob.DataContext = Bob;
            slbob.DataContext = Bob;
    
            var observable =
                Observable
                    .Merge(
                        Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(h => billy.PropertyChanged += h, h => billy.PropertyChanged += h).Select(x => billy),
                        Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(h => Bob.PropertyChanged += h, h => Bob.PropertyChanged += h).Select(x => Bob))
                    .Throttle(TimeSpan.FromSeconds(5.0))
                    .ObserveOn(SynchronizationContext.Current)
                    .Do(s =>
                    {
                        studentsList.Add(s);
                        Debug.Print($"List Count ==>>>{studentsList.Count}{Environment.NewLine}");
                    })
                    .Select(s => Observable.FromAsync(() => ExpensiveHttpCall(s)).Select(u => new { Student = s, Result = u }))
                    .Switch();
    
            _subscription =
                observable
                .ObserveOn(SynchronizationContext.Current)
                    .Subscribe(x =>
                    {
                        Debug.Print($"{x.Student.ID}{Environment.NewLine}Pos {x.Student.Position}{Environment.NewLine}Time Received {DateTime.Now.ToString()}{Environment.NewLine}List Count{studentsList.Count()}");
                    });
        }
    
        private IDisposable? _subscription = null;
    
        private async Task<Unit> ExpensiveHttpCall(Student student)
        {
            await Task.Delay(TimeSpan.FromSeconds(5.0));
            return Unit.Default;
        }
    
        static IList<Student> studentsList = new List<Student>();
    }