Search code examples
c#recursionsystem.reactivereactive-programmingpolling

Using Rx create a polling request for webservice call


Using Rx in C# I am trying to create a polling request to REST API. The problem which i am facing is that, Observable need to send responses in an order. Means If request A went at X Time and request B went at X + dx time and response of B came before A the Observable expression should ignore or cancel request A.

I have written a sample code which tries to depict the scenario. How can i fix it to get only the latest response and cancel or ignore the previous responses.

 class Program
    {
        static int i = 0;

        static void Main(string[] args)
        {
            GenerateObservableSequence();

            Console.ReadLine();
        }

        private static void GenerateObservableSequence()
        {
            var timerData = Observable.Timer(TimeSpan.Zero,
                TimeSpan.FromSeconds(1));

            var asyncCall = Observable.FromAsync<int>(() =>
            {
                TaskCompletionSource<int> t = new TaskCompletionSource<int>();
                i++;

                int k = i;
                var rndNo = new Random().Next(3, 10);
                Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
                return t.Task;
            });

            var obs = from t in timerData
            from data in asyncCall
            select data;

            var hot = obs.Publish();
            hot.Connect();

                hot.Subscribe(j => 
            {
                Console.WriteLine("{0}", j);
            });
        }
    }

After @Enigmativity answer: Added Polling Aync function to always take the latest response:

 public static IObservable<T> PollingAync<T> (Func<Task<T>> AsyncCall, double TimerDuration)
        {
            return Observable
         .Create<T>(o =>
         {
             var z = 0L;
             return
                 Observable
                     .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(TimerDuration))
                     .SelectMany(nr =>
                         Observable.FromAsync<T>(AsyncCall),
                         (nr, obj) => new { nr, obj})
                     .Do(res => z = Math.Max(z, res.nr))
                     .Where(res => res.nr >= z)
                     .Select(res => res.obj)
                     .Subscribe(o);
         });

    }

Solution

  • Let's start by simplifying your code.

    This is basically the same code:

    var rnd = new Random();
    
    var i = 0;
    
    var obs =
        from n in Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
        let r = ++i
        from t in Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10)))
        select r;
    
    obs.Subscribe(Console.WriteLine);
    

    I get this kind of result:

    2
    1
    3
    4
    8
    5
    11
    6
    9
    7
    10
    

    Alternatively, this can be written as:

    var obs =
        Observable
            .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
            .Select(n => ++i)
            .SelectMany(n =>
                Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (n, _) => n);
    

    So, now for your requirement:

    If request A went at X Time and request B went at X + dx time and response of B came before A the Observable expression should ignore or cancel request A.

    Here's the code:

    var rnd = new Random();
    
    var i = 0;
    var z = 0L;
    
    var obs =
        Observable
            .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
            .Select(n => new { n, r = ++i })
            .SelectMany(nr =>
                Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (nr, _) => nr)
            .Do(nr => z = Math.Max(z, nr.n))
            .Where(nr => nr.n >= z)
            .Select(nr => nr.r);
    

    I don't like using .Do like that, but I can't think of an alternative yet.

    This gives this kind of thing:

    1
    5
    8
    9
    10
    11
    14
    15
    16
    17
    22
    

    Notice that the values are only ascending.

    Now, you really should use Observable.Create to encapsulate the state that you're using. So your final observable should look like this:

    var obs =
        Observable
            .Create<int>(o =>
            {
                var rnd = new Random();
                var i = 0;
                var z = 0L;
                return
                    Observable
                        .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
                        .Select(n => new { n, r = ++i })
                        .SelectMany(nr =>
                            Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
                            (nr, _) => nr)
                        .Do(nr => z = Math.Max(z, nr.n))
                        .Where(nr => nr.n >= z)
                        .Select(nr => nr.r)
                        .Subscribe(o);
            });