Search code examples
f#server-sent-events.net-4.8

Streaming Server-Sent Events (SSE) in F#


What is a lightweight way to stream "server-sent events (SSE) style" events to the front-end in F#, using the System.Net.Http library? I understand the Event stream format (e.g. this PHP example code), but I am seeking some guidance to implement the streaming part in a server-side F# application (I'm on .Net Framework 4.8).


Solution

  • The following minimalistic, rudimentary code works (OS: Windows10, Browser: Google Chrome 92.0.4515, .Net Framework 4.8) :

    F# client-side code:

    module SSE0 =
        open System
        open System.IO
        open System.Net
    
    let pipeUTF8 (data: string) (sink: Stream) : Async<unit> = async {
        let bytes = System.Text.Encoding.UTF8.GetBytes data
        use src = new MemoryStream(bytes)
        do! src.CopyToAsync(sink) |> Async.AwaitTask }
    
    let private (=>=) data sink = pipeUTF8 data sink
    
    type Msg = { id: string; event: string; data: string } with
        member this.send (sink: Stream) : Async<unit> = async { 
            do! (sprintf "id:%s\n" this.id) =>= sink
            do! (sprintf "event:%s\n" this.event) =>= sink
            do! (sprintf "data:%s\n\n" this.data) =>= sink // Only works for single-line data payloads (won't work if eol included)
            do! " \n" =>= sink
            do! Async.Sleep 1000 // only for this basic example
            Console.WriteLine(sprintf "id: %s, event: %s, data: %s" this.id this.event this.data)
            do! sink.FlushAsync() |> Async.AwaitTask}
    
    let sse_count (ctx : HttpListenerContext) : Async<unit> =
        let output  = ctx.Response.OutputStream
        let message (i: int) : Msg = { id = sprintf "id#%02d" i; event = "message"; data = sprintf "data#%02d" i }
        let msgs = seq { for i in 0 .. 59 -> let msg = message i in async { do! msg.send output } }
        msgs |> Async.Sequential |> Async.Ignore
    
    let startServer (url: string) (handler: HttpListenerContext -> Async<unit>) (cts: Threading.CancellationTokenSource) : Threading.CancellationTokenSource =
        let task = async {
            use listener = new HttpListener()
            listener.Prefixes.Add(url)
            listener.Start()
            while true do
                let! context = listener.GetContextAsync() |> Async.AwaitTask
                let resp = context.Response
                [ ("Content-Type", "text/event-stream; charset=utf-8") 
                ; ("Cache-Control", "no-cache")
                ; ("Access-Control-Allow-Origin", "*") ] // or Access-Control-Allow-Origin: http://localhost:3000
                |> List.iter(fun (k, v) -> resp.AddHeader(k, v))
                Async.Start (handler context, cts.Token)
            }
        Async.Start (task, cts.Token)
        cts
    
    [<EntryPoint>]
    let main argv =
        let cts' = defaultArg None <| new Threading.CancellationTokenSource()
    
        Console.WriteLine("Press return to start.")
        Console.ReadLine() |> ignore
    
        Console.WriteLine("Running...")
        let cts = startServer "http://localhost:8080/events/" sse_count cts'
        
        Console.WriteLine("Press return to exit.")
        Console.ReadLine() |> ignore
        cts.Cancel()
        0
    

    html document:

    <!DOCTYPE html>
    <html>
        <head>
            <meta charset="utf-8" />
            <title>SSE test</title>
        </head>
    
        <body>
            <button id="btn">Close the connection</button>
            <ul id="msglist"></ul>
    
            <script>
                var es = new EventSource("http://localhost:8080/events/");
                
                es.onopen = function() {
                    console.log("Connection to server opened.");
                };
                
                var msgList = document.getElementById("msglist");
                es.onmessage = function(e) {
                    console.log("type: " + e.type +  ", id: " + e.lastEventId + ", data: " + e.data);
                    
                    var newElement = document.createElement("li");
                    newElement.textContent = "type: " + e.type +  ", id: " + e.lastEventId + ", data: " + e.data;
                    msgList.appendChild(newElement);
                };
    
                var btn = document.getElementById("btn");
                btn.onclick = function() {
                    console.log("Connection closed");
                    es.close();
                }
                
                es.onerror = function(e) {
                    console.log("Error found.");
                };
            </script>
        </body>
    

    The following resources were useful to get this done :

    https://github.com/mdn/dom-examples/tree/master/server-sent-events

    https://github.com/haf/FSharp.EventSource