Search code examples
mysqlnode.jsstreamnode-mysql

Cannot pause pool when streaming data with mysql-node


I use node and the mysql package to stream data from node to client.

The idea is,

define a pool, and queries based on the pool.

Then pass the streaming rows to an array.

If that array's length reaches a length, pause the stream, process the rows, send them to client via websockets.

Resume stream. Repeat until no other rows are left.

I am following the examples on the mysql npm page but I get pool.pause is not a function

Here is the code

var pool  = mysql.createPool({
  connectionLimit : 100,
  host            : config.host,
  user            : config.user,
  password        : config.password,
  database        : config.database
});

//turn simple queries to promises
const query = (str, ar) => {
  return new Promise(function(resolve, reject) {
    pool.query(str, ar, function (error, results, fields) { 
      if (error) {
          return reject(error);
      }
      resolve({results, fields});
    });
  })//promise
}


const userdetails = (ws, data) => {

//do a check, unrelated to streaming
   query('SELECT COUNT(id) as countrows FROM users WHERE category = ? ', [data.category])
  .then((data)=>{  
     if(data.results[0].countrows > 5000){
        // if more than 5000, we stream
        // the following is based on the mysql code found in their page
        // it has no relation to the promise-based query above
        var query = pool.query('SELECT id, name, address, sale, preexisting, amount FROM users WHERE category = ? ', [data.category])

        query.on('result', row => {  
          rowsToProcess.push(row);
          if (rowsToProcess.length >= 100) { 
            pool.pause();
            processRows();
          }
        });

        query.on('end', () => {
          processRows();
        });

       const processRows = (done) => {
         //process some data
         //send them back using websockets  
         ws.send(JSON.stringify({ data })); 
         pool.resume();  
       }
     }
  })
}

I dont know if this is related to making a simple query , a promise or using the pool, or anything else. This gives the TypeError: pool.pause is not a function and I cannot fix it. Please advice.

Thanks


Solution

  • You can try this solution, I have used this many times:

        const mysqlStreamQueryPromise = (queryString, params) => {
            return new Promise((resolve, reject) => {
                let streamData = connection.query(queryString,params).stream();
                let data = [];
                streamData.on('data', item => {
                    streamData.pause();
                    data.push(item);
                    streamData.resume();
                });
                streamData.on('end', end => {
                    return resolve(data);
                });
                streamData.on('error', error => {
                    return reject(error);
                });
            });
        }