Search code examples
node.jsfsnode-streams

Node Streams - Listening for unpipe in a Readable Stream


So, I've created a read stream that first connects to an SFTP and starts reading from a file. At any point, my code can unpipe that readstream and do something else. For example, I might use this to get the first few rows of a CSV and stop reading.

The problem is, I don't know how to listen for the unpipe event in my readStream constructor so that I can correctly close the SFTP connection. I use a flush method in write streams, is there something like that for read streams?

Here's a simplified portion of my readStream constructor:

const Client = require('ssh2').Client,
      nom = require('noms');

function getStream (get) {
    const self = this;
    const conn = new Client();

    let client,
        fileData,
        buffer,
        totalBytes = 0,
        bytesRead = 0;

    let read = function(size,next) {
        const read = this;
        // Read each chunk of the file
        client.read(fileData, buffer, bytesRead, size, bytesRead,
            function (err, byteCount, buff, pos) {
                bytesRead += byteCount;
                read.push(buff);
                next();
            }
        );
    };

    let before = function(start) {
        // setup the connection BEFORE we start _read
        conn.on('ready', function(){
            conn.sftp(function(err,sftp) {
                sftp.open(get, 'r', function(err, fd){
                    sftp.fstat(fd, function(err, stats) {
                        client = sftp;
                        fileData = fd;
                        totalBytes = stats.size;
                        buffer = new Buffer(totalBytes);

                        start();
                    });
                });
            });
        }).connect(credentials);
    };

    return nom(read,before);
}

Later I might call myStream.pipe(writeStream) and then myStream.unpipe(). But because I have no way of listening for that unpipeevent, the reading stops, but the SFTP connection stays open and eventually times out.

Any ideas?


Solution

  • So, after doing more research, I learned that ReadStreams are not passed the unpipe event when you call readStream.unpipe(writeStream). That event is passed to just the writeStream. In order to listen for the unpipe, you need to explicitly emit an event on the readStream, like so:

    readStream.emit('unpipe');
    

    You can listen for this event anywhere, inside or outside your stream constructor, which is really convenient. So, that would make the code above look like this:

    function getStream (get) {
        /**
         * ... stuff
         * ... read()
         * ... before()
         * ... etc
         */
    
        let readStream = nom(read,before);
    
        readStream.on('unpipe', function(){
            console.log('called unpipe on read stream');
        });
    
        return readStream;
    }
    

    Moral of the story, streams already have the Event Emitter class methods, so you can emit and listen for custom events out of the box.