Search code examples
javascriptnode.jsservercluster-computing

Data object consistency with several workers node


I am trying to create a simple server which will give every new request to different worker. The DATA object is a simple javascript object in separate file. The problem I faced with is CONSISTENCY of this DATA object.

How to prevent worker from handling the request if the previous request is still proceeding?
For example first request is UPDATE and lasts longer and the next request is DELETE and proceeds faster
What node tool or pattern I need to use to be 100% percent sure that DELETE will happen after UPDATE?
I need to run every worker on a different port

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

cluster.schedulingPolicy = cluster.SCHED_RR;

const PORT = 4000; 

if (cluster.isMaster) {
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
} else {
 http.createServer((req, res) => {
    if(req.url === '/users' && req.method === "PUT") {
       updateUser(req)
    } else if(req.url === '/users' && req.method === "DELETE") {
       deleteUser(req)
    } 

  }).listen(PORT++);
}

Solution

  • Your code won't even create multiple servers set aside the different ports, and the PORT variable is a const, so it won't increment either.

    What node tool or pattern I need to use to be 100% percent sure that DELETE will happen after UPDATE?

    • Use some sort of lock, not yet available on JavaScript
    • Use a semaphore/Mutex variable lock (See code).

    Remember, JavaScript is a single-threaded language.

    need to run every worker on a different port

    For each worker, set the listening based on worker ID (See code). Remember that the CPU cannot have capability to generate workers equal to that of number of cores.

    Sample working code:

    const express = require('express')
    const cluster = require('cluster')
    const os = require('os')
    
    if (cluster.isMaster) {
        for (let i = 0; i < os.cpus().length; i++) {
            cluster.fork()
        }
    } else {
        const app = express()
    
        // Global semaphore/mutex variable isUpdating
        var isUpdating = false;
        const worker = {
            handleRequest(req, res) {
                console.log("handleRequest on worker /" + cluster.worker.id);
                if (req.method == "GET") { // FOR BROWSER TESTING, CHANGE IT LATER TO PUT
                    isUpdating = true;
                    console.log("updateUser GET");
                    // do updateUser(req);
                    isUpdating = false;
                } else if (req.method == "DELETE") {
                    if (!isUpdating) { // Check for update lock
                        console.log("updateUser DELETE");
                        // do deleteUser(req)
                    }
                }
            },
        }
    
        app.get('/users', (req, res) => {
            worker.handleRequest(req, res)
        })
    
        // Now each worker will run on different port
        app.listen(4000 + cluster.worker.id, () => {
            console.log(`Worker ${cluster.worker.id} started listening on port ${4000 + cluster.worker.id}`)
        })
    }