Search code examples
node.jsmultithreadingpostgresqlcluster-computingnode-cluster

Node cluster: Handle task only to one worker


I´m trying to set up cluster to multi-thread my app but instead of delegating one task per worker each task is being delegated to all workers.

My code goes as follows:

//main.js
var cluster = require('cluster');

//Code executed by the master process.
if (cluster.isMaster) {
    var numWorkers = require('os').cpus().length;
    console.log('Setting up ' + numWorkers + ' workers.');

    for (var i = 0; i< numWorkers; i++) {
        cluster.fork();
    }

    cluster.on('online', function(worker) {
        console.log('Worker ' + worker.process.pid + ' is online');
    });

    //Spawn a new worker on unexpected events.
    cluster.on('exit', function(worker, code, signal) {
        console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
        console.log('Starting a new worker');
        cluster.fork();
    });
} else {
    //Set up DB
    var pg = require ('pg');
    var db = 'tcp://username:password@localhost/dbname';

    var pg_client = new pg.Client(db);
    pg_client.connect();

    //Set up Listener
    var query = pg_client.query('LISTEN newevent');
    pg_client.on('notification', function(not) {
        console.log(not.payload + ' added to queue by ' + process.pid);
    })
}

Every worker is processing the content of the else block so each time I insert a value into my table (a trigger fires up and notifies my pg_client who is listening) I get as many console.logs as workers cluster.isMaster set up however I´d like that only one worker would process each postgres event. How could I fix my code to achieve said behavior?


Solution

  • This would then not require the use of cluster. Cluster distributes processing evenly.

    You need to change the if-else-logic then entirely because...

    for (var i = 0; i< numWorkers; i++) {
         cluster.fork();
    }
    

    ...expresses explicitly that you want N-Processes. Rather then just child_process.fork(); which would execute explicitly not on the main thread. Read up on it here.