I have been playing around for 2 days now trying to embrace the realm of Reactive Programming using .Net's Reactive Extensions.
I built a use case of a status polling assuming a dummy web API and a reactive client that polls the status object.
I tried the following code :
// Creates an observable that ticks each 1 second
var ticksObservable = Observable.Interval(TimeSpan.FromMilliseconds(1000));
// Creates a new observable transforming each tick to a string status requested from the api
var coldStatusPollerObservable = ticksObservable.Select(tick =>
{
Console.WriteLine("Sending Request");
var tsk = client.GetStatus(1); // Http get request to a web api resource (id == 1 just for demo)
tsk.Wait();
return tsk.Result;
}
);
// Subscribe and print results on console
coldStatusPollerObservable.Subscribe(
status => Console.WriteLine(status), ex => Console.WriteLine(ex.Message)
);
Everything was just fine and i had the expected output :
{"status":"waiting"}
{"status":"running"}
{"status":"running"}
{"status":"running"}
{"status":"ok"}
Then I added another constraint which is a random Bad Request returned from the web API. The problem that occurred is that i couldn't handle the exception properly. The exception occurs in the tsk.wait() and what i expected is that it would only trigger the onError action i passed to Subscribe method ( ex => Console.WriteLine(ex.Message) )
Q1: What is the right way to handle exceptions in this case ? Q2: are there cleaner implementations for polling using Rx.NET ?
PS: I am using Rx.NET 3.1.1
You basically want to avoid using any .Wait()
blocking calls if you can. Rx comes with an operator designed to work with tasks - Observable.FromAsync
So your basic query now becomes:
var coldStatusPollerObservable =
from tick in ticksObservable
from status in Observable.FromAsync(() => client.GetStatus(1))
select status;
If you want your Console message then do this:
var coldStatusPollerObservable =
from tick in ticksObservable
from status in Observable.FromAsync(() =>
{
Console.WriteLine("Sending Request");
return client.GetStatus(1);
})
select status;
Just remember to always try to stick with the built-in operators where possible.
You can handle exceptions like this:
void Main()
{
var coldStatusPollerObservable =
from tick in Observable.Interval(TimeSpan.FromMilliseconds(1000))
from status in
Observable
.FromAsync(() => client.GetStatus(1))
.Catch<string, Exception>(ex => Observable.Return("Error"))
select status;
coldStatusPollerObservable.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
}
public static class client
{
private static int _counter = 0;
public static Task<string> GetStatus(int id)
{
if (_counter++ == 5)
throw new Exception();
return Task.Run(() => _counter.ToString());
}
}
This gives:
1 2 3 4 5 Error 7 8 9 10 11 12 13 14 15 ...