Search code examples
c#system.reactivereactive-programmingpolling

how to implement status polling using Rx.Net?


I have been playing around for 2 days now trying to embrace the realm of Reactive Programming using .Net's Reactive Extensions.

I built a use case of a status polling assuming a dummy web API and a reactive client that polls the status object.

I tried the following code :

        // Creates an observable that ticks each 1 second
        var ticksObservable = Observable.Interval(TimeSpan.FromMilliseconds(1000));



        // Creates a new observable transforming each tick to a string status requested from the api
        var coldStatusPollerObservable = ticksObservable.Select(tick =>
        {
            Console.WriteLine("Sending Request");
            var tsk = client.GetStatus(1); // Http get request to a web api resource (id == 1 just for demo)
            tsk.Wait();
            return tsk.Result;

        }
        );

       // Subscribe and print results on console
       coldStatusPollerObservable.Subscribe(
        status => Console.WriteLine(status), ex => Console.WriteLine(ex.Message)
        );

Everything was just fine and i had the expected output :

{"status":"waiting"}
{"status":"running"}
{"status":"running"}
{"status":"running"}
{"status":"ok"}

Then I added another constraint which is a random Bad Request returned from the web API. The problem that occurred is that i couldn't handle the exception properly. The exception occurs in the tsk.wait() and what i expected is that it would only trigger the onError action i passed to Subscribe method ( ex => Console.WriteLine(ex.Message) )

Q1: What is the right way to handle exceptions in this case ? Q2: are there cleaner implementations for polling using Rx.NET ?

PS: I am using Rx.NET 3.1.1


Solution

  • You basically want to avoid using any .Wait() blocking calls if you can. Rx comes with an operator designed to work with tasks - Observable.FromAsync

    So your basic query now becomes:

    var coldStatusPollerObservable =
        from tick in ticksObservable
        from status in Observable.FromAsync(() => client.GetStatus(1))
        select status;
    

    If you want your Console message then do this:

    var coldStatusPollerObservable =
        from tick in ticksObservable
        from status in Observable.FromAsync(() =>
        {
            Console.WriteLine("Sending Request");
            return client.GetStatus(1);
        })
        select status;
    

    Just remember to always try to stick with the built-in operators where possible.


    You can handle exceptions like this:

    void Main()
    {
        var coldStatusPollerObservable =
            from tick in Observable.Interval(TimeSpan.FromMilliseconds(1000))
            from status in
                Observable
                    .FromAsync(() => client.GetStatus(1))
                    .Catch<string, Exception>(ex => Observable.Return("Error"))
            select status;
    
        coldStatusPollerObservable.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
    }
    
    public static class client
    {
        private static int _counter = 0;
        public static Task<string> GetStatus(int id)
        {
            if (_counter++ == 5)
                throw new Exception();
            return Task.Run(() => _counter.ToString());
        }
    }
    

    This gives:

    1
    2
    3
    4
    5
    Error
    7
    8
    9
    10
    11
    12
    13
    14
    15
    ...