Search code examples
node.jsstreamnode.js-stream

Node.js Streams Readable to Transform


I have been trying to use a readable and a transform stream to process a very large file. The problem that I seem to come across is that if I don't put a writable stream at the end, the program seems to terminate before the result gets returned.

Example : rstream.pipe(split()).pipe(tstream)

My tstream has an emitter that emits when a counter hits a threshold. When that threshold is set to a low number, I get a result, but when it's high, it's not returning anything. If I pipe it to a file writer, it always returns a result. Am I missing something obvious?

code:

// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');

var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
    console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {

    // THIS WORKS if i have a low counter for qtransformstream, 
    // but when it's high, I do not get a result
    //   rstream.pipe(split()).pipe(qtransformstream);

    // this always works
    rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);

};

Here is the code for the Qtransformstream

// Dependencies
var Transform = require('stream').Transform,
    util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
    // Create this as a Transform Stream
    Transform.call(this, {
        objectMode: true
    });
    // Default the Qbase to 32 as an assumption
    this.Qbase = 32;
    if (Quser) {
        this.Quser = Quser;
    } else {
        this.Quser = 20;
    }
    this.Qpass = this.Quser + this.Qbase;
    this.Counter = 0;
    // Variables used as intermediates
    this.Qmin = 120;
    this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
    var Qmin = this.Qmin;
    var Qmax = this.Qmax;
    var Qbase = this.Qbase;
    var Quser = this.Quser;
    this.Counter++;
    // Stop the stream after 100 reads and emit the data
    if (this.Counter === 100) {
        this.emit('completed', this.Qbase, this.Quser);
    }
    // do some calcs on this.Qbase

    this.push('something not important');
    callback();
};
// export the object
module.exports = TransformStream;

Solution

  • EDIT:

    Also, I don't know how high your counter goes but if you fill up the buffer it will stop passing data to the transform stream in which case completed is never actually hit because you never get to the counter limit. Try changing your highwatermark.

    EDIT 2: A Little Better Explanation

    As you well know a transform stream is a duplex stream which basically means it can accept data from a source, and it can send data to a destination. This is commonly referred to as reading and writing respectively. The transform stream inherits from both the read stream and the write stream implemented by Node.js. There is one caveat though, the transform stream does not have to implement the _read or _write functions. In that sense you can kind of think of it as the lesser known passthrough stream.

    If you think about the fact that the transform stream implements the write stream you must also think about the fact that the write stream always has a destination to dump its contents. The problem you are having is that when you create a transform stream you can't specify a place to send your content. The only way to pass data completely through your transform stream is to pipe it to a write stream, otherwise, in essence your streams get backed up and can't accept more data, because there is no place for the data to go.

    This is why when you are piping to a write stream it always works. The write stream is alleviating the data backup by sending the data to a destination, so all of your data will be piped through and the event of complete will be emitted.

    The reason that your code is working without the write stream when the sample size is small is that you aren't filling up your stream, so the transform stream can accept enough data to allow the complete event/threshold to be hit. As the threshold increases the amount of data your stream can accept without sending it to another place (a write stream) stays the same. This causes your stream to get backed up, and it can no longer accept data, which means that the completed event will never be emitted.

    I would venture to say that if you increase your highwatermark for the transform stream you will be able to increase your threshold and still have the code work. This method is incorrect though. Pipe your stream to a write stream that will send the data to dev/null the way to create that write stream is:

    var writer = fs.createWriteStream('/dev/null');
    

    The section in the Node.js docs on buffering explain the error you are running into.