Search code examples
node.jsnonblocking

node js non blocking for loop


Please check if my understanding about the following for loop is correct.

for(let i=0; i<1000; i){
  sample_function(i, function(result){});
}

The moment the for loop is invoked, 1000 events of sample_function will be qued in the event loop. After about 5 seconds a user gives a http request, which is qued after those "1000 events". Usually this would not be a problem because the loop is asynchronous. But lets say that this sample_function is a CPU intensive function. Therefore the "1000 events" are completed consecutively and each take about 1 second. As a result, the for loop will block for about 1000 seconds.

Would there be a way to solve such problem? For example would it be possible to let the thread take a "break" every 10 loops? and allow other new ques to pop in between? If so how would I do it?


Solution

  • There is a technique called partitioning which you can read about in the NodeJs's document, But as the document states:

    If you need to do something more complex, partitioning is not a good option. This is because partitioning uses only the Event Loop, and you won't benefit from multiple cores almost certainly available on your machine.

    So you can also use another technique called offloading, e.g. using worker threads or child processes which also have certain downsides like having to serialize and deserialize any objects that you wish to share between the event loop (current thread) and a worker thread or a child process

    Following is an example of partitioning that I came up with which is in the context of an express application.

    const express = require('express');
    const crypto = require('crypto');
    const randomstring = require('randomstring');
    
    const app = express();
    const port = 80;
    
    app.get('/', async (req, res) => {
        res.send('ok');
    })
    
    app.get('/block', async (req, res) => {
        let result = [];
        for (let i = 0; i < 10; ++i) {
            result.push(await block());
        }
        res.send({result});
    })
    
    app.listen(port, () => {
        console.log(`Listening on port ${port}`);
        console.log(`http://localhost:${port}`);
    })
    
    /* takes around 5 seconds to run(varies depending on your processor) */
    const block = () => {
        //promisifying just to get the result back to the caller in an async way, this is not part of the partitioning technique
        return new Promise((resolve, reject) => {
            /**
             * https://nodejs.org/en/docs/guides/dont-block-the-event-loop/#partitioning
             * using partitioning techinique(using setImmediate/setTimeout) to prevent a long running operation
             * to block the eventloop completely
             * there will be a breathing period between each time block is called
             */
            setImmediate(() => {
                let hash = crypto.createHash("sha256");
                const numberOfHasUpdates = 10e5;
                for (let iter = 0; iter < numberOfHasUpdates; iter++) {
                    hash.update(randomstring.generate());
                }
                resolve(hash);
            })
        });
    }
    

    There are two endpoints / and /block, if you hit /block and then hit / endpoint, what happens is that the / endpoint will take around 5 seconds to give back response(during the breathing space(the thing that you call it a "break"))

    If setImmediate was not used, then the / endpoint would respond to a request after approximately 10 * 5 seconds(10 being the number of times block function is called in the for-loop)

    Also you can do partitioning using a recursive approach like this:

    /**
     * 
     * @param items array we need to process
     * @param chunk a number indicating number of items to be processed on each iteration of event loop before the breathing space
     */
    function processItems(items, chunk) {
        let i = 0;
        const process = (done) => {
            let currentChunk = chunk;
            while (currentChunk > 0 && i < items?.length) {
                --currentChunk;
                syncBlock();
                ++i;
            }
    
            if (i < items?.length) {
                setImmediate(process);//the key is to schedule the next recursive call (by passing the function to setImmediate) instead of doing a recursive call (by simply invoking the process function)
            }
        }
        process();
    }
    

    And if you need to get back the data processed you can promisify it like this:

    function processItems(items, chunk) {
        let i = 0;
        let result = [];
        const process = (done) => {
            let currentChunk = chunk;
            while (currentChunk > 0 && i < items?.length) {
                --currentChunk;
                const returnedValue = syncBlock();
                result.push(returnedValue);
                ++i;
            }
    
            if (i < items?.length) {
                setImmediate(() => process(done));
            } else {
                done && done(result);
            }
        }
        const promisified = () => new Promise((resolve) => process(resolve));
        return promisified();
    }
    

    And you can test it by adding this route handler to the other route handlers provided above:

    app.get('/block2', async (req, res) => {
        let result = [];
    
        let arr = [];
        for (let i = 0; i < 10; ++i) {
            arr.push(i);
        }
        result = await processItems(arr, 1);
        res.send({ result });
    })