Search code examples
node.jspostgresqlsocketssocket.ioknex.js

Query a table then stream results to socket.io with knex and postgres


I want to query a table and write the contents to a socket as they come in, for a larger query. I was reading the documentation for streams. I was trying to implement this with socket.io. Below is my example of a /users route, where app is an instance of express and io is a socket.io instance.

module.exports = function (app, io) {
  app.get('/users', function (req, res, next) {
    const limit = req.queryParams.limit || 100;
    const stream = req.db.select('*').from('users').limit(limit).stream();
    req.on('close', stream.close.bind(stream));  // manually close on request cancel
    // how can you stream to the socket?
    // how do you know when the amount is reached to end the response?
  });
}

What I am wondering is; how can I stream the results from this query into the io socket? I want to essentially emit an added event whenever a result is found, using the tablename, id, and found entry as arguments.


Solution

  • how can you stream to the socket?

    You can access the streamed DB rows by listening for the data event from the knex stream and passing the data along to socket.io via io.emit.

    how do you know when the amount is reached to end the response?

    The stream will emit an end event.

    You know the stream is done when the end event fires, but since you are accepting requests on an HTTP channel but responding over a separate web socket channel, you can send the HTTP response to res right away without waiting for the DB query results if you like (res.send()).

     module.exports = function (app, io) {
      app.get('/users', function (req, res, next) {
        const limit = req.queryParams.limit || 100;
        const stream = req.db.select('*').from('users').limit(limit).stream();
        stream.on('data', function (row) {
          io.emit('user', row)
        })
        .on('error', function (error) {
          io.emit('error', error.message)
        })
        .on('end', function () {
          io.emit('end')
        })
        req.on('close', stream.close.bind(stream));  // manually close on request cancel
        // how can you stream to the socket?
        // how do you know when the amount is reached to end the response?
      });
    }