Search code examples
javascriptnode.jsnode.js-stream

Synchronous Emitted Events With csv-parser


I'm trying to use the npm package csv-parser for parsing my csv files and have run into an issue with the order of events occurring.

Events are emitted in this order

  1. 'headers': Want to insert metadata about the csv into a database and return an id value
  2. 'data': Want to use the returned id value from the headers event for all data events
  3. 'data'
  4. 'data'
  5. ...
  6. end

Obviously the asynchronous nature of node means that my slow database access in 'headers' hasn't returned by the time the first 'data' event is emitted and therefore I don't have the id of the csv yet. The only option I can think of is to cache all data rows into some temporary variable and then push everything once the whole csv has been read. Considering that I may have very large csv files, this seems like a bad idea? Any suggestions on a better method of tackling this problem?

EDIT: Added some code (pseudo code, not actually tested)

let headerList = null;
let dataArray = [];
fs.createReadStream(path)
    .pipe(csv())
    // Parse the headers into a comma delimminated string
    .on('headers', function(headers) {
        // some parsing logic and then assigned to variable
        headerList = headers;
    })
    .on('data', function (data) {
        // Some push of data into a variable
        dataArray.push(data);
    })
    .on('end', function() {
        // create the base upload object
        const id = uploads.createUpload(filename, headerList, new Date());

        // insert data
        uploads.insertUploadData(id, dataArray);
    })

Solution

    1. When you get the headers event, unpipe() the read stream. This will put the file reader into a paused state so you don't have to buffer a bunch of stuff in memory.

    2. Because data is read from disk in chunks (usually 64 kB), the CSV parser will still emit data events as it continues to parse the current chunk. You'll still need to buffer a small number of rows in an array.

    3. When you have all the information you need from the database:

      1. Submit the buffered rows to the database.

      2. Remove the original data event handler (the one that queues to an array) and attach one that submits rows directly to the database.

      3. pipe() the read stream back to the CSV parser.


    You may also want to consider what happens if your program reads from disk and parses CSV faster than your database can accept data. Since there's no backpressure, a large number of database operations may end up queuing up in memory until you run out.

    You should pause the file read stream if there are many pending DB operations.