Search code examples
node.jsasync-awaitrequestfsnode-streams

Limiting concurrence when processing a file line-by-line


I'm new to node.js, and trying to do the following:

  • read csv file
  • perform operation on the data in each line (handled by an API)
  • write outcome to new file

I want to avoid loading the whole input file into memory, which is why I've used a stream. However, I'm running into the problem that I think I'm overloading my API because of sending too many requests at the same time. Is there a way to limit the amount of requests sent to the API?

Here is what I have come up with:

import fs from 'fs';
import csv from 'fast-csv';
import fetch from 'node-fetch';

async function process_data(input){
    talk_options.body = JSON.stringify(input);
    let sendrequest = await fetch('api-url', talk_options)
            .then(response => response.json())
            .then(data => {                
                let result = data.result;
            })
            .catch(function (err) {
                console.log(err);
                let result = {};
            });
    return result;
}

let readStream = fs.createReadStream('input.csv');
let writeStream = fs.createWriteStream('output.out');
let csvStream = csv.parse({headers: true});

csvStream.on('data', async function(data) {
    let processed_data = await process_data(data);
    writeStream.write(JSON.stringify(processed_data));
})
.on('end', function(){
    console.log('done');
})
.on('error', function(error){
    console.log(error);
});

let talk_options = {
    method: 'POST',
    headers: {'Content-Type': "application/json"},
    body: null
};

readStream.pipe(csvStream);

Thanks for any help or information in the right direction.


Solution

  • You could rate limit the code that processes the data. I mean, you likely have to do it anyway, since you are calling an API.

    I highly recommend reservoir intervals with bottleneck.