Using Rx in C# I am trying to create a polling request to REST API. The problem which i am facing is that, Observable need to send responses in an order. Means If request A went at X Time and request B went at X + dx time and response of B came before A the Observable expression should ignore or cancel request A.
I have written a sample code which tries to depict the scenario. How can i fix it to get only the latest response and cancel or ignore the previous responses.
class Program
{
static int i = 0;
static void Main(string[] args)
{
GenerateObservableSequence();
Console.ReadLine();
}
private static void GenerateObservableSequence()
{
var timerData = Observable.Timer(TimeSpan.Zero,
TimeSpan.FromSeconds(1));
var asyncCall = Observable.FromAsync<int>(() =>
{
TaskCompletionSource<int> t = new TaskCompletionSource<int>();
i++;
int k = i;
var rndNo = new Random().Next(3, 10);
Task.Delay(TimeSpan.FromSeconds(rndNo)).ContinueWith(r => { t.SetResult(k); });
return t.Task;
});
var obs = from t in timerData
from data in asyncCall
select data;
var hot = obs.Publish();
hot.Connect();
hot.Subscribe(j =>
{
Console.WriteLine("{0}", j);
});
}
}
After @Enigmativity answer: Added Polling Aync function to always take the latest response:
public static IObservable<T> PollingAync<T> (Func<Task<T>> AsyncCall, double TimerDuration)
{
return Observable
.Create<T>(o =>
{
var z = 0L;
return
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(TimerDuration))
.SelectMany(nr =>
Observable.FromAsync<T>(AsyncCall),
(nr, obj) => new { nr, obj})
.Do(res => z = Math.Max(z, res.nr))
.Where(res => res.nr >= z)
.Select(res => res.obj)
.Subscribe(o);
});
}
Let's start by simplifying your code.
This is basically the same code:
var rnd = new Random();
var i = 0;
var obs =
from n in Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
let r = ++i
from t in Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10)))
select r;
obs.Subscribe(Console.WriteLine);
I get this kind of result:
2 1 3 4 8 5 11 6 9 7 10
Alternatively, this can be written as:
var obs =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.Select(n => ++i)
.SelectMany(n =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (n, _) => n);
So, now for your requirement:
If request A went at X Time and request B went at X + dx time and response of B came before A the Observable expression should ignore or cancel request A.
Here's the code:
var rnd = new Random();
var i = 0;
var z = 0L;
var obs =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.Select(n => new { n, r = ++i })
.SelectMany(nr =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))), (nr, _) => nr)
.Do(nr => z = Math.Max(z, nr.n))
.Where(nr => nr.n >= z)
.Select(nr => nr.r);
I don't like using .Do
like that, but I can't think of an alternative yet.
This gives this kind of thing:
1 5 8 9 10 11 14 15 16 17 22
Notice that the values are only ascending.
Now, you really should use Observable.Create
to encapsulate the state that you're using. So your final observable should look like this:
var obs =
Observable
.Create<int>(o =>
{
var rnd = new Random();
var i = 0;
var z = 0L;
return
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
.Select(n => new { n, r = ++i })
.SelectMany(nr =>
Observable.Timer(TimeSpan.FromSeconds(rnd.Next(3, 10))),
(nr, _) => nr)
.Do(nr => z = Math.Max(z, nr.n))
.Where(nr => nr.n >= z)
.Select(nr => nr.r)
.Subscribe(o);
});