So, I've created a read stream that first connects to an SFTP and starts reading from a file. At any point, my code can unpipe that readstream and do something else. For example, I might use this to get the first few rows of a CSV and stop reading.
The problem is, I don't know how to listen for the unpipe
event in my readStream constructor so that I can correctly close the SFTP connection. I use a flush
method in write streams, is there something like that for read streams?
Here's a simplified portion of my readStream constructor:
const Client = require('ssh2').Client,
nom = require('noms');
function getStream (get) {
const self = this;
const conn = new Client();
let client,
fileData,
buffer,
totalBytes = 0,
bytesRead = 0;
let read = function(size,next) {
const read = this;
// Read each chunk of the file
client.read(fileData, buffer, bytesRead, size, bytesRead,
function (err, byteCount, buff, pos) {
bytesRead += byteCount;
read.push(buff);
next();
}
);
};
let before = function(start) {
// setup the connection BEFORE we start _read
conn.on('ready', function(){
conn.sftp(function(err,sftp) {
sftp.open(get, 'r', function(err, fd){
sftp.fstat(fd, function(err, stats) {
client = sftp;
fileData = fd;
totalBytes = stats.size;
buffer = new Buffer(totalBytes);
start();
});
});
});
}).connect(credentials);
};
return nom(read,before);
}
Later I might call myStream.pipe(writeStream)
and then myStream.unpipe()
. But because I have no way of listening for that unpipe
event, the reading stops, but the SFTP connection stays open and eventually times out.
Any ideas?
So, after doing more research, I learned that ReadStreams are not passed the unpipe
event when you call readStream.unpipe(writeStream)
. That event is passed to just the writeStream. In order to listen for the unpipe, you need to explicitly emit an event on the readStream, like so:
readStream.emit('unpipe');
You can listen for this event anywhere, inside or outside your stream constructor, which is really convenient. So, that would make the code above look like this:
function getStream (get) {
/**
* ... stuff
* ... read()
* ... before()
* ... etc
*/
let readStream = nom(read,before);
readStream.on('unpipe', function(){
console.log('called unpipe on read stream');
});
return readStream;
}
Moral of the story, streams already have the Event Emitter class methods, so you can emit and listen for custom events out of the box.