Search code examples
javascriptnode.jsstreamfs

How to manage a stream in order to process incoming data in node.js


For each line in a file, I want to execute a computationally intensive task, such as image compression. The problem I have is that the data comes in too fast and overwhelms the memory. Ideally I'd like to be able to pause and continue the stream as the data gets processed.

I initially tried using the readline module with a file stream like this:

const fileStream = fs.createReadStream('long-list.txt')
const rl = readline.createInterface({ input: fileStream })
rl.on('line', (line, lineCount) => {
   doTheHeavyTask(line)
})

However, this quickly overwhelms the memory with thousands of calls to doTheHeavyTask().

I settled on pushing each line into a queue and creating an event that dequeues the next line when the previous line is done being processed:

const lineQ = new Queue() // From the 'queue-fifo' module
rl.on('line', (line, lineCount) => {
   lineQ.enqueue(line)
})
const lineEmitter = new EventEmitter() // From the 'events' module
lineEmitter.on('processNextLine', async () => {
    await doTheHeavyTask( lineQ.dequeue() )
    if (!lineQ.isEmpty()) lineEmitter.emit('processNextLine')
})
setTimeout( () => lineEmitter.emit('processNextLine'), 20) // Give rl a moment to enqueue some lines

This works, but it seems kind of hacky and not much better than just reading in the file all at once.

I'm vaguely aware of concepts like "backpressure" and "generators" in Javascript, but I'm not sure how to apply them.


Solution

  • The problem here is not the stream itself, but the async task that you trigger. Every task (wether it is a callback with a closure or an async function) will consume memory. If you start multiple (thousand) tasks at the same time, that'll use resources.

    You can use an async iterator to go over the lines, and do one task for each (and wait for it):

     (async function () {
         for await (const el of rl) {
            await doHeavyTask(el);
         }
     })();
    

    That'll correctly backpressure.

    However, it only does one task at a time, which might be quite slow. To buffer a few elements and process them concurrently, you could do:

     const SIZE = 10; // to be tested with different values
    
     (async function () {
       let chunk = [];
       for await(const el of rl) {
          chunk.push(el);
          if(chunk.length >= SIZE) {
             await Promise.all(chunk.map(doHeavyTask));
             chunk.length = 0;
          }
       }
       await Promise.all(chunk.map(doHeavyTask));
    })();
    

    You need at least Node 11.14.0 for this to work.