Search code examples
node.jspromisebluebirdpg-promise

NodeJS, promises, streams - processing large CSV files


I need to build a function for processing large CSV files for use in a bluebird.map() call. Given the potential sizes of the file, I'd like to use streaming.

This function should accept a stream (a CSV file) and a function (that processes the chunks from the stream) and return a promise when the file is read to end (resolved) or errors (rejected).

So, I start with:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

Now, I have two inter-related issues:

  1. I need to throttle the actual amount of data being processed, so as to not create memory pressures.
  2. The function passed as the processor param is going to often be async, such as saving the contents of the file to the db via a library that is promise-based (right now: pg-promise). As such, it will create a promise in memory and move on, repeatedly.

The pg-promise library has functions to manage this, like page(), but I'm not able to wrap my ahead around how to mix stream event handlers with these promise methods. Right now, I return a promise in the handler for readable section after each read(), which means I create a huge amount of promised database operations and eventually fault out because I hit a process memory limit.

Does anyone have a working example of this that I can use as a jumping point?

UPDATE: Probably more than one way to skin the cat, but this works:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

Anyone sees a potential problem with this approach?


Solution

  • Find below a complete application that correctly executes the same kind of task as you want: It reads a file as a stream, parses it as a CSV and inserts each row into the database.

    const fs = require('fs');
    const promise = require('bluebird');
    const csv = require('csv-parse');
    const pgp = require('pg-promise')({promiseLib: promise});
    
    const cn = "postgres://postgres:password@localhost:5432/test_db";
    const rs = fs.createReadStream('primes.csv');
    
    const db = pgp(cn);
    
    function receiver(_, data) {
        function source(index) {
            if (index < data.length) {
                // here we insert just the first column value that contains a prime number;
                return this.none('insert into primes values($1)', data[index][0]);
            }
        }
    
        return this.sequence(source);
    }
    
    db.task(t => {
        return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
    })
        .then(data => {
            console.log('DATA:', data);
        }
        .catch(error => {
            console.log('ERROR:', error);
        });
    

    Note that the only thing I changed: using library csv-parse instead of csv, as a better alternative.

    Added use of method stream.read from the spex library, which properly serves a Readable stream for use with promises.