Search code examples
javascriptpostgresqlexpressstreamknex.js

Timeout acquiring a connection when streaming results using Express


We use the following code to stream the results of a query back to the client:

app.get('/events', (req, res) => {
  try {
    const stream = db('events')
      .select('*')
      .where({ id_user: 'foo' })
      .stream()

    stream.pipe(JSONStream.stringify()).pipe(res)
  } catch (err) {
    next(err)
  }
})

While the code seems to have an excellent memory usage profile (stable/low memory usage) it creates random DB connection acquisition timeouts:

Knex: Timeout acquiring a connection. The pool is probably full. Are you missing a .transacting(trx) call?

This happens in production at seeming random intervals. Any idea why?


Solution

  • This was painful to debug:

    Alright, this happens because aborted requests (i.e client closes the browser mid-request) don't release the connection back to the pool.

    First, ensure you're on the latest knex; or at least v0.21.3+ which has introduced fixes to stream/pool handling.

    From the on you have a couple options:

    Either use stream.pipeline instead of stream.pipe which handles aborted requests correctly like so:

    const { pipeline } = require('stream')
    
    app.get('/events', (req, res) => {
      try {
        const stream = db('events')
          .select('*')
          .where({ id_session: req.query.id_session })
          .stream()
    
        return pipeline(stream, JSONStream.stringify(), res, err => {
          if (err) {
            return console.log(`Pipeline failed with err:`, err)
          }
    
          console.log(`Pipeline ended succesfully`)
        })
      } catch (err) {
        next(err)
      }
    })
    

    or listen to the [close][close] event on req and destroy the DB stream yourself, like so:

    app.get('/events', (req, res) => {
      try {
        const stream = db('events')
          .select('*')
          .where({ id_session: req.query.id_session })
          .stream()
    
        // Not listening to this event will crash the process if
        // stream.destroy(err) is called.
        stream.on('error', () => {
          console.log('Stream was destroyed')
        })
    
        req.on('close', () => {
          // stream.end() does not seem to work, only destroy()
          stream.destroy('Aborted request')
        })
    
        stream.pipe(JSONStream.stringify()).pipe(res)
      } catch (err) {
        next(err)
      }
    })
    

    Useful reading: