Search code examples
javascriptnode.jsloopssettimeoutdelay

Read stream with settimeout maximum value reached error


I am trying to read some large CSV files and processing those data, so there is a rate limit in processing, so I want to add 1mnt delay between each request. I tried with the set timeout, but finally, come to know there is a limit for settimeout and get the following error. I am not sure any other way to handle the situation, the CSV file has more than 1M records. Am I doing anything wrong here?

Error

Timeout duration was set to 1. (node:41) TimeoutOverflowWarning: 2241362000 does not fit into a 32-bit signed integer.

Samle code:

   const Queue = require('bull');
const domainQueue = new Queue(config.api.crawlerQ, {
  redis: connectRedis(),
});
let ctr = 0;
function processCSV (name, fileName, options)  {
  return new Promise((resolve, reject) => {
    console.log('process csv started', new Date());
    let filePath = config.api.basePath + fileName;
    stream = fs.createReadStream(filePath)
        .on('error', (error) => {
          // handle error
          console.log('error processing csv');
          reject(error);
        })
        .pipe(csv())
        .on('data', async (row) => {
          ctr++
          increment(row, ctr)
        })
        .on('end', () => {
          console.log('stream processCSV end', fileName, new Date());
          resolve(filePath);
        })
  });

}

async function increment(raw, counter) {
  setTimeout(async function(){
    console.log('say i am inside a function', counter, new Date());
    domainQueue.add(data, options); // Add jobs to queue - Here i Need a delay say 1mnt, if i
    // add jobs without delay it will hit ratelimit 
  }, 60000 * counter);

}

function queueWorkerProcess(value) { // Process jobs in queue and save in text file 
  console.log('value', value, new Date());
  return new Promise(resolve => {
    resolve();
  });

}

Solution

  • Here's a general idea. You need to keep track of how many items are inflight being processed to both limit the amount of memory used and to control the load on whatever resources you're storing the results in.

    When you hit some limit of how many are inflight, you pause the stream. When you get back below the limit, you resume the stream. You increment a counter on .add() and decrement a counter on the completed message to keep track of things. That's where you pause or resume the stream.

    FYI, just inserting a setTimeout() somewhere won't help you. To get your memory usage under control, you have to pause the flow of data from the stream once you have too many items in process. Then, when the items get back under a threshold, you can then resume the stream.

    Here's an outline of what that could look like:

    const Queue = require('bull');
    const domainQueue = new Queue(config.api.crawlerQ, {
        redis: connectRedis(),
    });
    
    // counter that keeps track of how many items in the queue
    let queueCntr = 0;
    
    // you tune this constant up or down to manage memory usage or tweak performance
    // this is what keeps you from having too many requests going at once
    const queueMax = 20;
    
    function processCSV(name, fileName, options) {
        return new Promise((resolve, reject) => {
            let paused = false;
    
            console.log('process csv started', new Date());
            const filePath = config.api.basePath + fileName;
    
            const stream = fs.createReadStream(filePath)
                .on('error', (error) => {
                    // handle error
                    console.log('error processing csv');
                    domainQueue.off('completed', completed);
                    reject(error);
                }).pipe(csv())
                .on('data', async (row) => {
                    increment(row, ctr);
                    if (queueCntr)
                })
                .on('end', () => {
                    console.log('stream processCSV end', fileName, new Date());
                    domainQueue.off('completed', completed);
                    resolve(filePath);
                });
    
            function completed() {
                --queueCntr;
                // see if queue got small enough we now resume the stream
                if (paused && queueCntr < queueMax) {
                    stream.resume();
                    paused = false;
                }
            }
    
            domainQueue.on('completed', completed);
    
            function increment(raw, counter) {
                ++queueCntr;
                domainQueue.add(data, options);
                if (!paused && queueCntr > queueMax) {
                    stream.pause();
                    paused = true;
                }
            }
        });
    }
    

    And, if you're calling processCSV() multiple times with different files, you should sequence them so you don't call the 2nd one until the first one is done, don't call the 3rd one until the 2nd one is done and so on... You don't show that code so we can't make a specific suggestion on that.