Search code examples
node.jshttpwebserverserver-sent-eventsdeno

Deno: Server Sent Events


Server Sent Events are a valuable tool to open a persistent connection to a web server, where the server has the ability to push new data to the client, when available.

Using this technology in Node.js is quite straightforward and can be implemented with the following code example:

#!/usr/bin/env node
'use strict';

const http = (options, listener) => require('http').createServer(listener).listen(options.port);

http({ port: 8080 }, (req, res) => {
  switch (req.url) {
    case '/server-sent-events': {
      res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Connection': 'keep-alive',
        'Cache-Control': 'no-cache',
      });

      const sendDate = () => res.write(`data: ${new Date()}\n\n`);
      sendDate();
      const interval = setInterval(sendDate, 1000);

      req.on('close', () => clearInterval(interval));
    } break;

    default: {
      res.writeHead(200, {
        'Content-Type': 'text/html; charset=utf-8',
      });
      res.end(`
        <!DOCTYPE html>
        <html>
          <head>
            <title>Server Send Events</title>
            <meta charset="utf-8">
            <script>
              const sse = new EventSource('/server-sent-events');
              sse.onerror = () => document.body.innerHTML = 'Connection Error';
              sse.onmessage = ({ data }) => document.body.innerHTML = data;
            </script>
          </head>
          <body></body>
        </html>
      `);
    }
  }
});

Unfortunately I am not able to achieve the same goal with Deno, as there is no simple write method on the request object, but I guess it has to be implemented somehow using the req.w buffer. Can you help me please finish off the following example code, so the Server Sent Events can be utilised with Deno as well?

#!/usr/bin/env deno run --allow-net

import { listenAndServe as http } from 'https://deno.land/std/http/server.ts';

http({ port: 8080 }, (req) => {
  switch (req.url) {
    case '/server-sent-events': {
      // missing steps:
      // * setup the server sent event headers
      // * create the interval and send the date periodically
      // * clear the interval when the connection gets closed
    } break;

    default: {
      req.respond({
        headers: new Headers({
          'Content-Type': 'text/html; charset=utf-8',
        }),
        body: `
          <!DOCTYPE html>
          <html>
            <head>
              <title>Server Send Events</title>
              <meta charset="utf-8">
              <script>
                const sse = new EventSource('/server-sent-events');
                sse.onerror = () => document.body.innerHTML = 'Connection Error';
                sse.onmessage = ({ data }) => document.body.innerHTML = data;
              </script>
            </head>
            <body></body>
          </html>
        `,
      });
    }
  }
});

Thank you very much for your support!

[Update 2021-11-04]:

I have made some progress doing some research across different sources (https://deno.land/std@0.76.0/http/server.ts, https://github.com/denoland/deno/issues/4817) and got a step closer to the solution. Using the updated example below at least the setup and usage of the Server Sent Events do work now. The remaining issue (besides cleaning up and refactoring of the code) remains the safe detection when the incoming request has been closed (see comments in the source code below):

#!/usr/bin/env deno run --allow-net

import { listenAndServe as http } from 'https://deno.land/std/http/server.ts';

http({ port: 8080 }, (req) => {
  switch (req.url) {
    case '/server-sent-events': {
      // set up a quick´n´dirty write method without error checking
      req.write = (data) => {
        req.w.write(new TextEncoder().encode(data));
        req.w.flush();
      };

      // setup the server sent event headers
      let headers = '';
      headers += 'HTTP/1.1 200 OK\r\n';
      headers += 'Connection: keep-alive\r\n';
      headers += 'Cache-Control: no-cache\r\n';
      headers += 'Content-Type: text/event-stream\r\n';
      headers += '\r\n';
      req.write(headers);

      // create the interval and send the date periodically
      const sendDate = () => req.write(`data: ${new Date()}\n\n`);
      sendDate();
      const interval = setInterval(sendDate, 1000);

      // final missing step:
      // * clear the interval when the connection gets closed

      // currently dropping the connection from the client will
      // result in the error: Uncaught (in promise) BrokenPipe:
      // Broken pipe (os error 32)
      // this error also does not seem to be catchable in the 
      // req.write method above, so there needs to be another safe
      // way to prevent this error from occurring.
    } break;

    default: {
      req.respond({
        headers: new Headers({
          'Content-Type': 'text/html; charset=utf-8',
        }),
        body: `
          <!DOCTYPE html>
          <html>
            <head>
              <title>Server Send Events</title>
              <meta charset="utf-8">
              <script>
                const sse = new EventSource('/server-sent-events');
                sse.onerror = () => document.body.innerHTML = 'Connection Error';
                sse.onmessage = ({ data }) => document.body.innerHTML = data;
              </script>
            </head>
            <body></body>
          </html>
        `,
      });
    }
  }
});

[Update 2021-04-16]

All issues have been resolved and are posted in my accepted answer below.


Solution

  • In the end I have found an answer to my question and so the full answer with plenty of comments follows, so you get a working version of server sent events in Deno. The solution below also solves the os error 32, which gets caused by not catching the connection writer flash method:

    #!/usr/bin/env deno run --allow-net
    
    // imports
    import { ServerRequest, listenAndServe as http } from 'https://deno.land/std/http/server.ts';
    
    // commodity
    const encoder = new TextEncoder();
    const print = console.log;
    
    
    // start the web-server
    // this one allows the endpoint `/server-sent-events`, which hosts a clock that
    // will be refreshed every second (the efficiency of the clock solution could of
    // course be optimised, as every client gets its own clock interval, but this
    // this does not matter as this example wants to show how to setup and clean a
    // task for every connecting client)
    // all other requests will be answered with a simple html page that subscribes
    // to the sse-based clock
    http({ port: 8080 }, async (req) => {
      // ip address of the client (formatted as `ip:port`, so we cut the `:port` part
      // of it)
      const ip = req.headers.get('host').split(':').slice(0, -1).join(':');
    
      // determine the endpoint to access
      switch (req.url) {
        // host the server sent event based clock
        case '/server-sent-events': {
          // logging
          print(`+ Client ${ip} connected`);
    
          // prepare the disconnect promise. we will use this one later on to await
          // the clients disconnect, so we can properly clean up. so the promise will
          // be resolved manually by us when we detect a disconnect from the client
          // on an attempt to send new data to him (unfortunately there seems to be
          // no other way to detect when the client actually closed the connection)
          let resolver;
          const disconnect = new Promise((resolve) => resolver = resolve);
    
          // write helper
          req.write = async (data) => {
            // send the current data to the client
            req.w.write(encoder.encode(data));
    
            // to actually send the data we need to flush the writer first. we need
            // to try/catch this part, as not handling errors on flush will lead to
            // the `Broken pipe (os error 32)` error
            try {
              await req.w.flush();
            } catch(err) {
              // throw any errors but the broken pipe, which gets thrown when the
              // client has already disconnected and we try to send him new data
              // later on
              if (err.name !== 'BrokenPipe') {
                throw err;
              }
    
              // close the connection from our side as well
              req.conn.close();
    
              // resolve our `disconnect` promise, so we can clean up
              resolver();
            }
          };
    
          // date writer (interval method which pushes the current date to the client)
          const sendDate = async () => await req.write(`data: ${new Date()}\n\n`);
    
          // prepare and send the headers
          let headers = '';
          headers += `HTTP/1.1 200 OK\r\n`;
          headers += `Connection: keep-alive\r\n`;
          headers += `Cache-Control: no-cache\r\n`;
          headers += `Content-Type: text/event-stream\r\n`;
          headers += `\r\n`;
          await req.write(headers);
    
          // send the date now for the first time and then every second
          sendDate();
          const interval = setInterval(sendDate, 1000);
    
          // await until the clients disconnects to clean up. so we will be "stuck"
          // here until a disconnect gets detected as we use a promise based approach
          // to detect the disconnect
          await disconnect;
          clearInterval(interval);
    
          // logging
          print(`- Client ${ip} disconnected`);
        } break;
    
        // all other requests host a simple html page which subscribes to the clock
        default: {
          print(`* Serve website to ${ip}`);
          req.respond({
            headers: new Headers({
              'Content-Type': 'text/html; charset=utf-8',
            }),
            body: `
              <!DOCTYPE html>
              <html>
                <head>
                  <title>Server Sent Events</title>
                  <meta charset="utf-8">
                  <script>
                    const sse = new EventSource('/server-sent-events');
                    sse.onerror = () => document.body.innerHTML = 'Connection Error';
                    sse.onmessage = ({ data }) => document.body.innerHTML = data;
                  </script>
                </head>
                <body></body>
              </html>
            `,
          });
        }
      }
    });