Search code examples
c#system.reactive

How to group and Throttle Object by ID Rx


I have incoming objects of the same type, but if An Object property IsThrottlable is set to false regardless of the ID I DON'T want to throttle it but if IsThrottlable is set to true I would like to throttle the object every 3 seconds by ID. So if an object with the same ID comes in 50 times with 3 seconds I would like to send the HTTPSend for the last Object.

namespace BoatThrottle
{
    class MData
    {
        public int ID { get; set; }
        public bool IsThrottlable { get; set; }
        public string Description { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            Random rand = new Random();

            while (true)
            {
                var data = GenerateRandomObj(rand);
                SendData(data);
                Task.Delay(rand.Next(100, 2000));
            }
        }

        static MData GenerateRandomObj(Random rand)
        {
            return new MData() { ID = rand.Next(1, 20), Description = "Notification....", IsThrottlable = (rand.Next(2) == 1) };
        }

        static void SendData(MData mData)
        {
            if (mData.IsThrottlable)
            {
                _doValues.OnNext(mData);
                var dd = ThrottledById(DoValues);

                var observable =
                   dd
                    .Throttle(TimeSpan.FromMilliseconds(3000.0))
                    .ObserveOn(Scheduler.ThreadPool.DisableOptimizations());

                _subscription =
                    observable
                    .ObserveOn(Scheduler.ThreadPool.DisableOptimizations())
                        .Subscribe(y =>
                        {
                            HTTPSend(y);
                        });

            }
            else
            {
                // MData object coming in IsThrottlable set to false always send this data NO throttling
                HTTPSend(mData);
            }

        }
        private static IDisposable? _subscription = null;

        public static IObservable<MData> ThrottledById(IObservable<MData> observable)
        {
            return observable.Buffer(TimeSpan.FromSeconds(3))
                .SelectMany(x =>
                    x.GroupBy(y => y.ID)
                    .Select(y => y.Last()));
        }

        private static readonly Subject<MData> _doValues = new Subject<MData>();

        public static IObservable<MData> DoValues { get { return _doValues; } }

        static void HTTPSend(MData mData)
        {
            Console.WriteLine("===============HTTP===>>  " + mData.ID + "  " + mData.Description + " " + mData.IsThrottlable);
        }
    }
}

EDIT:

e.g ALL received within 3 seconds

  • MData ID = 1, IsThrottlable = False, Description = "Notify"

  • MData ID = 2, IsThrottlable = True, Description = "Notify1"

  • MData ID = 2, IsThrottlable = True, Description = "Notify2"

  • MData ID = 9, IsThrottlable = False, Description = "Notify2"

  • MData ID = 2, IsThrottlable = True, Description = "Notify3"

  • MData ID = 2, IsThrottlable = True, Description = "Notify4"

  • MData ID = 3, IsThrottlable = True, Description = "Notify"

  • MData ID = 4, IsThrottlable = True, Description = "Notify"

  • MData ID = 5, IsThrottlable = True, Description = "Notify1"

  • MData ID = 5, IsThrottlable = True, Description = "Notify2"

  • MData ID = 8, IsThrottlable = True, Description = "Notify1"

  • MData ID = 8, IsThrottlable = True, Description = "Notify2"

  • MData ID = 8, IsThrottlable = True, Description = "Notify3"

  • MData ID = 8, IsThrottlable = True, Description = "Notify4"

  • MData ID = 8, IsThrottlable = True, Description = "Notify5"

  • MData ID = 8, IsThrottlable = True, Description = "Notify6"

Expected at the First 3 seconds:

  • MData ID = 1, IsThrottlable = False, Description = "Notify"
  • MData ID = 9, IsThrottlable = False, Description = "Notify2"
  • MData ID = 2, IsThrottlable = True, Description = "Notify4"
  • MData ID = 3, IsThrottlable = True, Description = "Notify"
  • MData ID = 4, IsThrottlable = True, Description = "Notify"
  • MData ID = 5, IsThrottlable = True, Description = "Notify2"
  • MData ID = 8, IsThrottlable = True, Description = "Notify6"

Solution

  • One way to do it is to group the sequence by the IsThrottlable property. This way you'll get a nested sequence that contains two subsequences, one containing the throttleable elements and one containing the non-throttleable elements. You can then transform each of the two subsequences accordingly, and finally use the SelectMany operator to flatten the nested sequence back to a flat sequence that contains the elements emitted by the two transformed subsequences.

    The subsequence that contains the non-throttleable elements needs no transformation, so you can return it as is.

    The subsequence that contains the throttleable elements needs to be grouped further by the ID property, producing even thinner subsequences that contain only throttleable elements having the same id. These are the sequences that need to be throttled:

    IObservable<MData> throttled = source
        .GroupBy(x => x.IsThrottlable)
        .SelectMany(g1 =>
        {
            if (!g1.Key) return g1; // Not throttleable, return it as is.
            return g1
                .GroupBy(x => x.ID)
                .SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3)));
        });
    

    At the end you'll get a flat sequence that contains both the throttleable and the non-throttleable items, with the throttleable items already throttled by id.

    The SelectMany operator is essentially a combination of the Select+Merge operators.