Search code examples
node.jscsvpapaparsenode-streamshighland.js

PapaParse and Highland


I have to parse a very big CSV file in NodeJS and save it in a database (async operation) that allows up to 500 entries at a time. Due to memory limits I have to stream the CSV file and want to use PapaParse to parse the CSV file (as that worked best in my case).

As PapaParse uses a callback style approach to parse Node.js streams I didn't see an easy to combine highland (for batching and data transform) and PapaParse. So, I tried to use a ParseThrough stream to write data to and read that stream with highland for batching:

const csv = require('papaparse');
const fs = require('fs');
const highland = require('highland');
const { PassThrough } = require('stream');

const passThroughStream = new PassThrough({ objectMode: true });

csv.parse(fileStream, {
  step: function(row) {
    // Write data to stream
    passThroughStream.write(row.data[0]);
  },
  complete: function() {
    // Somehow "end" the stream
    passThroughStream.write(null);
  },
});

highland(passThroughStream)
  .map((data) => {
    // data transform
  })
  .batch(500)
  .map((data) => {
    // Save up to 500 entries in database (async call)
  });

Obviously that doesn't work as is and doesn't do anything really. Is something like that even possible or even an better way to parse very big CSV files and save the rows in a database (in batches of up to 500)?

Edit: Using the csv package (https://www.npmjs.com/package/csv) it would be possible like so (same for fast-csv):

highland(fileStream.pipe(csv.parse()))
  .map((data) => {
    // data transform
  })
  .batch(500)
  .map((data) => {
    // Save up to 500 entries in database (async call)
  });

But unfortunately both NPM packages do not parse the CSV files properly in all cases.


Solution

  • After a quick look at papaparse I decided to implement CSV parser in scramjet.

    fileStream.pipe(new scramjet.StringStream('utf-8'))
        .csvParse(options)
        .batch(500)
        .map(items => db.insertArray('some_table', items))
    

    I hope that works for you. :)