Search code examples
f#system.reactive

Using System.Reactive.Linq for polling at an interval


I've spent hours combing through documentation and tutorials, but can't figure out how to use ReactiveX to poll an external resource, or anything for that matter, every at an interval. Below is some code I wrote to get information from a REST API at an interval.

open System
open System.Reactive.Linq

module MyObservable =
    let getResources = 
        async {
            use client = new HttpClient()
            let! response = client.GetStringAsync("http://localhost:8080/items") |> Async.AwaitTask
            return response
        } |> Async.StartAsTask

    let getObservable (interval: TimeSpan) = 
        let f () = getResources.Result
        Observable.Interval(interval) 
        |> Observable.map(fun _ -> f ()) 

To test this out, I tried subscribing to the Observable and waiting five seconds. It does receive something every second for five seconds, but the getResources is only called the first time and then the result is just used at each interval. How can I modify this to make the REST call at each interval instead of just the result of the first call being used over and over again?

let mutable res = Seq.empty
getObservable (new TimeSpan(0,0,1))
|> Observable.subscribe(fun (x: seq<string>) -> res <- res |> Seq.append x;) 
|> ignore
Threading.Thread.Sleep(5000)

Solution

  • Don't use a Task. Tasks are what we call "hot", meaning that if you have a value of type Task in your hand, it means that the task is already running, and there is nothing you can do about it. In particular, this means you cannot restart it, or start a second instance of it. Once a Task is created, it's too late.

    In your particular case it means that getResources is not "a way to start a task", but just "a task". Already started, already running.

    If you want to start a new task every time, you have two alternatives:

    First (the worse alternative), you could make getResources a function rather than a value, which you can do by giving it a parameter:

    let getResources () = 
        async { ...
    

    And then call it with that parameter:

    let f () = getResources().Result
    

    This will run the getResources function afresh every time you call f(), which will create a new Task every time and start it.

    Second (a better option), don't use a Task at all. You're creating a perfectly good async computation and then turning it into a Task only to block on getting its result. Why? You can block on an async's result just as well!

    let getResources = async { ... }
    
    let getObservable interval =
      let f () = getResources |> Async.RunSynchronously
      ...
    

    This works, even though getResources is not a function, because asyncs, unlike Tasks, are what we call "cold". This means that, if you have an async in your hand, it doesn't mean that it's already running. async, unlike Task, represents not an "already running" computation, but rather "a way to start a computation". A corollary is that you can start it multiple times from the same async value.

    One way to start it is via Async.RunSynchronously as I'm doing in my example above. This is not the best way, because it blocks the current thread until the computation is done, but it's equivalent to what you were doing with accessing the Task.Result property, which also blocks until the Task is done.