Search code examples
node.jsasynchronousstreamqueuelarge-files

Reading and processing large and small files using stream and async in NodeJS


I'm having trouble processing a list of files line by line. Here is the code that I'm using :

var LineReader = require("line-by-line");
var async = require("async");
var files = [ "small.txt", "medium.txt", "large.txt" ];

var queue = async.queue(function(task, next){ console.log(task); next(); }, 10);

async.eachSeries(
    files,
    function (file, callback) {
        var lineReader = new LineReader(file, { encoding: "utf8", skipEmptyLines: true });

        lineReader.on("error", function (err) {
            callback(err);
        });

        lineReader.on("line", function (line) {
            lineReader.pause();
            queue.push(line);
        });

        queue.drain = function () {
            lineReader.resume(); // I need to resume the stream !
            callback(); // When all lines have been processed, I need to read the next file
        };
    },
    function (err) {
        if (err) return console.log(err);
        console.log("Job done.");
    }
);

I'm using async to "synchronously" process each file and process each line in queue, and line-by-line to read each file line by line.

My problem is :

  • If I pause the stream, push the line to the queue and resume the stream after I'm getting this error

RangeError: Maximum call stack size exceeded

  • If I pause the stream, push the line to the queue and waiting the queue to be empty, I can't resume the stream and execute the callback

q.drain = function () { lineReader.resume(); callback(); };

How can I wait until all lines have been processed and execute the callback to process the next file ?

Thank you.

UPDATE:

I found a strange thing with "line-by-line" module. The "end" event is emitted twice. So I decided to refactor the code and I found where the issue come. Another problem : the module has not been updated for a year and there is 2 pull requests sent 1 month ago.

Here is my solution (if line-by-line had worked) :

var LineReader = require("line-by-line");
var async = require("async");
var files = [ "small.txt", "medium.txt", "large.txt" ];

var queue = async.queue(function(task, next){ console.log(task); next(); }, 10);

async.eachSeries(
    files,
    function (file, callback) {
        var lineReader = new LineReader(file, { encoding: "utf8", skipEmptyLines: true });

        lineReader.on("error", function (err) {
            callback(err);
        });

        lineReader.on("end", function () {
            callback();
        });

        lineReader.on("line", function (line) {
            lineReader.pause();
            queue.push(line);
        });

        queue.drain = function () {
            lineReader.resume();
        };
    },
    function (err) {
        if (err) return console.log(err);
        console.log("Job done.");
    }
);

With this solution, we have only 1 line in the queue. If anyone have an idea to push more than 1 line and then pause the stream.

I will try to find another module without this issue, because I don't want to rewrite a new module for that.


Solution

  • I would solve this problem totaly different.

    There is no need to listen to events or pause with the new stream API.
    I would use gulp and through2 like so:

    var gulp = require('gulp')
    , thr = require('through2').obj
    ;
    
    function fixLine (line) {
      // do stuff with a single line of a file.
      // just return it back for no reason :)
      return line
    }
    
    files = [ "small.txt", "medium.txt", "large.txt" ]
    gulp.src(files).pipe(thr(function(vfs, enc, next){
      // vfs - vinyl filesystem.
      var str = vfs.contents.toString().split('\n').map(fixLine).join('\n')
      vfs.contents = new Buffer(str)
      next(null, vfs)
    }))
    

    However this is async. There is no guarantee that the order of the files will be the one in the array. But the line are, obviously, process in order.

    I hope this helps.