Search code examples
mysqlnode.jseventemitter

Node MySQL Pool Cluster Event Emitter Not Working


The documentation gives examples of registering connection event emitters for pools:

pool.on('connection', function (connection) {
    connection.query('SET SESSION auto_increment_increment=1');
});

Documentation shows how to get a pool from a cluster:

var pool = poolCluster.of('SLAVE*', 'RANDOM');
pool.getConnection(function (err, connection) {});
pool.getConnection(function (err, connection) {});
pool.query(function (error, results, fields) {});

However, var pool = poolCluster.of('SLAVE*', 'RANDOM'); followed by pool.on( ... ) errors with pool.on is not a function

Trying to register the .on('connection') via the cluster executes without error but has no effect.

Code to reproduce:

var mysql = require('mysql');

var mysql_pool_cluster = mysql.createPoolCluster();
mysql_pool_cluster.add('myPool', {user: 'root', password: 'password'});
mysql_pool_cluster.on('connection', function(new_conn) {
    console.log('hello from cluster event emitter');
    new_conn.release();
});

var pool = mysql_pool_cluster.of('myPool', 'ORDER');
try {
    pool.on('connection', function(new_conn) {
        console.log('hello from pool event emitter');
        new_conn.release();
    });
} catch(err) {
    console.error(err);
}

console.log('');

pool.getConnection(function(err, conn) {
    if (err) {
        console.error(err);
    } else {
        console.log('hello from new connection');
        conn.release();

        mysql_pool_cluster.end(function(err) {
            if (err) {
                console.error(err);
            }
        });
    }
});

Output from above code:

TypeError: pool.on is not a function
    at Object.<anonymous> (E:\scratch\scratch_server.js:14:7)
    at Module._compile (module.js:570:32)
    at Object.Module._extensions..js (module.js:579:10)
    at Module.load (module.js:487:32)
    at tryModuleLoad (module.js:446:12)
    at Function.Module._load (module.js:438:3)
    at Module.runMain (module.js:604:10)
    at run (bootstrap_node.js:394:7)
    at startup (bootstrap_node.js:149:9)
    at bootstrap_node.js:509:3

hello from new connection

As you can see, pool.on('connection') fails to execute, and cluster.on('connection') does not emit when running for the first time .getConnection().


Solution

  • I decided that the cluster library was not up to spec, so I coded my own cluster class:

    var mysql = require('mysql');
    var Promise = require('promise');
    
    var deepCopy = function(obj) {
        // https://stackoverflow.com/a/15040626
        return JSON.parse(JSON.stringify(obj));
    };
    var logger = {log: console.log, error: console.error};
    
    class mysqlPoolCluster {
        // custom class to work around event emitter bug in mysql.createPoolCluster()
        //  https://stackoverflow.com/q/44466894
    
        constructor() {
            this._pool_dict = {};
            this._future_pools_on_events_dict = {};
        }
    
        mergeAndSplitConfs(segregated_confs_dict) {
            /*
                Converts this object --
                {
                    pools: {
                        admin: {connectionLimit: 1, user: 'my_admin', password: 'password'},
                        read: {user: 'my_reader', password: 'password'},
                        write: {user: 'my_writer', password: 'password'},
                        read_write: {user: 'my_reader_writer', password: 'password'}
                    },
                    host: 'localhost',
                    database: 'my_db',
                    connectionLimit: 2
                }
    
                to this object --
                {
                    admin: {
                        connectionLimit: 1,
                        user: 'my_admin',
                        password: 'password',
                        host: 'localhost',
                        database: 'my_db'
                    },
                    read: {
                        connectionLimit: 2,
                        user: 'my_reader',
                        password: 'password',
                        host: 'localhost',
                        database: 'my_db'
                    },
                    write: {
                        connectionLimit: 2,
                        user: 'my_writer',
                        password: 'password',
                        host: 'localhost',
                        database: 'my_db'
                    },
                    read_write: {
                        connectionLimit: 2,
                        user: 'my_reader_writer',
                        password: 'password',
                        host: 'localhost',
                        database: 'my_db'
                    }
                }
            */
    
            var pools_dict = deepCopy(segregated_confs_dict.pools);
            if (!pools_dict) {
                throw new Error("arg does not have property 'pools'");
            }
    
            var base_conf = deepCopy(segregated_confs_dict);
            delete base_conf.pools;
    
            var base_keys = Object.keys(base_conf);
            var pool_names = Object.keys(pools_dict);
            for(var i_pool_name=0; i_pool_name < pool_names.length; i_pool_name++) {
                var pool_conf = pools_dict[pool_names[i_pool_name]];
                for(var i_base_key=0; i_base_key < base_keys.length; i_base_key++) {
                    var base_key = base_keys[i_base_key];
                    if (!pool_conf.hasOwnProperty(base_key)) {
                        pool_conf[base_key] = deepCopy(base_conf[base_key]);
                    }
                }
            }
    
            return pools_dict;
        }
    
        populatePools(confs_dict) {
            // 'confs_dict' is the return from this.mergeAndSplitConfs()
            var names = Object.keys(confs_dict);
            try {
                for(var i_name=0; i_name < names.length; i_name++) {
                    var name = names[i_name];
                    this.createAndAddPool(name, confs_dict[name]);
                }
            } catch(err) {
                this.endClusterAndRemovePoolsPromiser()
                .catch(logger.error);
    
                throw err;
            }
        }
    
        createAndAddPool(name, conf) {
            if (this._pool_dict.hasOwnProperty(name)) {
                throw new Error("pool '" + name + "' already exists");
            }
    
            this._pool_dict[name] = mysql.createPool(conf);
    
            try {
                this.getPool(name).on('connection', function(conn) {
                    conn.queryPromiser = function(sql, args) {
                        return new Promise(function(resolve, reject) {
                            conn.query(
                                sql,
                                args,
                                function(err, results, fields) {
                                    if (err) {
                                        reject(err);
                                    } else {
                                        resolve( {"results": results, "fields": fields} );
                                    }
                                }
                            );
                        });
                    };
                });
    
                var that = this;
    
                this.getPool(name).queryPromiser = function(sql, args) {
                    return new Promise(function(resolve, reject) {
                        that.getPool(name).query(
                            sql,
                            args,
                            function(err, results, fields) {
                                if (err) {
                                    reject(err);
                                } else {
                                    resolve( {"results": results, "fields": fields} );
                                }
                            }
                        );
                    });
                };
    
                this.getPool(name).getConnectionPromiser = function() {
                    return new Promise(function(resolve, reject) {
                        that.getPool(name).getConnection(
                            function(err, conn) {
                                if (err) {
                                    reject(err);
                                } else {
                                    resolve(conn);
                                    // remember to call conn.release() when you're finished with the conn
                                }
                            }
                        );
                    });
                };
    
                var events = Object.keys(this._future_pools_on_events_dict);
                for(var i_event=0; i_event < events.length; i_event++) {
                    var event = events[i_event];
                    for(var i_cb=0; i_cb < this._future_pools_on_events_dict[event].length; i_cb++) {
                        this.getPool(name).on(event, this._future_pools_on_events_dict[event][i_cb]);
                    }
                }
    
                return this.getPool(name);
            } catch(err) {
                this.endAndRemovePoolPromiser(name)
                .catch(logger.error);
    
                throw err;
            }
        }
    
        getPool(name) {
            if (this._pool_dict.hasOwnProperty(name)) {
                return this._pool_dict[name];
            } else {
                throw new Error("pool '" + name + "' does not exist");
            }
        }
    
        endAndRemovePoolPromiser(name) {
            var that = this;
            return new Promise(function(resolve, reject) {
                that.getPool(name).end(function(err) {
                    delete that._pool_dict[name];
    
                    if (err) {
                        reject(err);
                    } else {
                        resolve();
                    }
                });
            });
        }
    
        endClusterAndRemovePoolsPromiser() {
            var end_promises = [];
            var err_list = [];
            var names = Object.keys(this._pool_dict);
            for(var i_name=0; i_name < names.length; i_name++) {
                end_promises.push(
                    this.endAndRemovePoolPromiser(names[i_name])
                    .catch(function(err) {
                        err_list.push(err);
                    })
                );
            }
    
            return Promise.all(end_promises)
            .then(function() {
                if (err_list.length) {
                    return Promise.reject(err_list);
                }
            });
        }
    
        on(event, cb) {
            var names = Object.keys(this._pool_dict);
            for(var i_name=0; i_name < names.length; i_name++) {
                this.getPool(names[i_name]).on(event, cb);
            }
    
            if (!this._future_pools_on_events_dict.hasOwnProperty(event)) {
                this._future_pools_on_events_dict[event] = [];
            }
            this._future_pools_on_events_dict[event].push(cb);
        }
    }