Search code examples
c#timercronsystem.reactiveobservable

Cron Observable Sequence


I would like to create an observable sequence using reactive extensions (RX) and NCrontab. The sequence would differ from something like Observable.Timer() in that the period and due time are not fixed. After reading this article it seems that Observable.Generate() is the way to go. I am thinking of two variants: one which runs within bounds and one that runs forever. Do these implementation make sense?

public static IObservable<DateTime> Cron(string cron)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(DateTime.Now, d=>true, d => DateTime.Now, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(d)));
}

public static IObservable<DateTime> Cron(string cron, DateTime start, DateTime end)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(start, d => d < end, d => DateTime.Now, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(d)));
}

update: These seem to work empirically, however I added an overload which takes an IScheduler and cannot seem to get the sequence to trigger in a unit test. Am I using TestScheduler wrong or is there an issue with the function implementation?

public static IObservable<int> Cron(string cron, IScheduler scheduler)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(0, d => true, d => d + 1, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(scheduler.Now.DateTime)), scheduler);
}

[TestClass]
public class EngineTests
{
    [TestMethod]
    public void TestCron()
    {
        var scheduler = new TestScheduler();
        var cron = "* * * * *";
        var values = new List<int>();
        var disp = ObservableCron.Cron(cron, scheduler).Subscribe(values.Add);
        scheduler.AdvanceBy(TimeSpan.TicksPerMinute - 1);
        scheduler.AdvanceBy(1);
        scheduler.AdvanceBy(1);
        Assert.IsTrue(values.Count> 0);
    }
}

Solution

  • It looks like a combination of issues. First, the Observable.Generate overload that I am using takes a Func<int,DateTimeOffset> parameter to determine the next time to trigger. I was passing in a new DateTimeOffset based on the local time of the scheduler rather than Utc, which was causing the new 'DateTimeOffset` to shift. See this question for an explanation. The correct function is below:

    public static IObservable<int> Cron(string cron, IScheduler scheduler)
    {
        var schedule = CrontabSchedule.Parse(cron);
        return Observable.Generate(0, d => true, d => d + 1, d => d,
            d => new DateTimeOffset(schedule.GetNextOccurrence(scheduler.Now.UtcDateTime)), scheduler);
    }
    

    As far as testing goes, I came up with something that demonstrates the intent a little better:

    [TestMethod]
    public void TestCronInterval()
    {
        var scheduler = new TestScheduler();
        var end = scheduler.Now.UtcDateTime.AddMinutes(10);
        const string cron = "*/5 * * * *";
        var i = 0;
        var seconds = 0;
        var sub = ObservableCron.Cron(cron, scheduler).Subscribe(x => i++);
        while (i < 2)
        {
            seconds++;
            scheduler.AdvanceBy(TimeSpan.TicksPerSecond);
        }
        Assert.IsTrue(seconds == 600);
        Assert.AreEqual(end, scheduler.Now.UtcDateTime);
        sub.Dispose();
    }