Search code examples
mysqlnode.jsnode-mysql

Creating synchronous queries with node-mysql


I'm trying to ensure that one mysql query leads to another and is not completed until all of its children queries are completed. So for example, I start with one select and stream rows and execute subsequent queries from that row result. This is doable with callbacks, but I end up running out of memory, so I'd like to slow down the process and run batches, but due to the async nature of the dispatch, I can't keep things in phase and end the connection after all the rows have been processed.

Here's an example:

var query = conn.query('select id from table1 limit 10');

query.on('result', function(row){
    console.log('query1', row);
    var query2 = conn.query('select id from books where id  = ? ', [row.id]);
    query2.on('result', function(row2){
        console.log('query2', row2);
        var query3 = conn.query('insert into test (id) values (?)', [row2.id]);
        query3.on('result', function(row3){
            console.log(row3);
        });
    });
});

query.on('end', function(){
    conn.end();
});

The above fails, because there are still rows to process in query3 after the initial query is ended.
Any thoughts? The actual code is even more complicated, because I have to process xml from the subsequent queries and fire off even more inserts as I loop through the batch.

Thanks!


Solution

  • I would suggest this solution with async module:

    var async = require("async");
    // connection instance
    var conn;
    
    // here goes task serving logic
    // if any async function should be finished before drain callback, push them into q
    var solvers = {
        query: function(q, task, row){
            console.log('query1', row);
            q.push({
                solver: "query2",
                req: "select id from books where id = ?",
                reqArgs: [row.id]
            });
        },
        query2: function(q, task, row){
            console.log('query2', row);
            q.push({
                solver: "query3",
                req: "insert into test (id) values (?)",
                reqArgs: [row.id]
            });
        },
        query3: function(q, task, row){
            console.log(row);
        }
    }
    
    // here is a queue of tasks
    var q = async.queue(function(task, cb){
        var query = conn.query(task.req, task.reqArgs);
        query.on("end", cb);
        query.on("result",function(row){
            solvers[task.solver](q, task, row);
        });
    }, 2); // limit of parallel queries
    
    // when every request has reached "end"
    q.drain = function(){
        conn.end();
        // continue from here
    };
    
    // initial task
    q.push({
        solver: "query",
        req: "select id from table1 limit 10",
        reqArgs: []
    });
    

    But still, I'm not sure that making requests ID by ID is a good solution.
    Maybe, I'm just not aware of a full problem.