I use node and the mysql package to stream data from node to client.
The idea is,
define a pool, and queries based on the pool.
Then pass the streaming rows to an array.
If that array's length reaches a length, pause the stream, process the rows, send them to client via websockets.
Resume stream. Repeat until no other rows are left.
I am following the examples on the mysql npm page but I get pool.pause is not a function
Here is the code
var pool = mysql.createPool({
connectionLimit : 100,
host : config.host,
user : config.user,
password : config.password,
database : config.database
});
//turn simple queries to promises
const query = (str, ar) => {
return new Promise(function(resolve, reject) {
pool.query(str, ar, function (error, results, fields) {
if (error) {
return reject(error);
}
resolve({results, fields});
});
})//promise
}
const userdetails = (ws, data) => {
//do a check, unrelated to streaming
query('SELECT COUNT(id) as countrows FROM users WHERE category = ? ', [data.category])
.then((data)=>{
if(data.results[0].countrows > 5000){
// if more than 5000, we stream
// the following is based on the mysql code found in their page
// it has no relation to the promise-based query above
var query = pool.query('SELECT id, name, address, sale, preexisting, amount FROM users WHERE category = ? ', [data.category])
query.on('result', row => {
rowsToProcess.push(row);
if (rowsToProcess.length >= 100) {
pool.pause();
processRows();
}
});
query.on('end', () => {
processRows();
});
const processRows = (done) => {
//process some data
//send them back using websockets
ws.send(JSON.stringify({ data }));
pool.resume();
}
}
})
}
I dont know if this is related to making a simple query , a promise or using the pool, or anything else. This gives the TypeError: pool.pause is not a function
and I cannot fix it. Please advice.
Thanks
You can try this solution, I have used this many times:
const mysqlStreamQueryPromise = (queryString, params) => {
return new Promise((resolve, reject) => {
let streamData = connection.query(queryString,params).stream();
let data = [];
streamData.on('data', item => {
streamData.pause();
data.push(item);
streamData.resume();
});
streamData.on('end', end => {
return resolve(data);
});
streamData.on('error', error => {
return reject(error);
});
});
}