Search code examples
javascriptnode.jsstreampipepipeline

Node - Closing Streams Properly after Pipeline


Let's say I have the following code:

try {
    let size = 0;

    await pipeline(
        fs.createReadStream('lowercase.txt'),
        async function* (source) {
            for await (const chunk of source) {
                size += chunk.length;
           
                if (size >= 1000000) {
                    throw new Error('File is too big');
                }

                yield String(chunk).toUpperCase();
            }
        },
        fs.createWriteStream('uppercase.txt')
    );

    console.log('Pipeline succeeded.');
} catch (error) {
    console.log('got error:', error);
}

How do I make sure I am properly closing the streams in every single case? The node docs aren't much help -- they just tell me that I am going to have dangling event listeners:

stream.pipeline() will call stream.destroy(err) on all streams except:

Readable streams which have emitted 'end' or 'close'.

Writable streams which have emitted 'finish' or 'close'.

stream.pipeline() leaves dangling event listeners on the streams after the callback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.


Solution

  • So, I find many of the node.js stream compound operations such as pipeline() and .pipe() to be really bad/incomplete at error handling. For example, if you just do this:

    fs.createReadStream("input.txt")
      .pipe(fs.createWriteStream("output.txt"))
      .on('error', err => {
          console.log(err);
      }).on('finish', () => {
          console.log("all done");
      });
    

    You would expect that if there was an error opening the readStream that you'd get that error in your error handler here, but "no" that is not the case. An error opening that input file will be unhandled. There's some logic to that as .pipe() returns the output stream and an input error isn't an error on the output stream, but when that's not passed through, it makes it very easy to miss errors on the input stream. The .pipe() operation could have listened for errors on the input stream and passed an error through (even if it was a pipeErr or something different) and then it could have also cleaned up the writeStream properly upon a read error. But, .pipe() wasn't implemented that thoroughly. It seems to want to assume that there would never be an error on the input stream.

    Instead, you have to separately save the readStream object and attach an error handler to it directly in order to see that error. So, I just don't trust this compound stuff any more and the doc never really explains how to do proper error handling. I tried to look at the code for pipeline() to see if I could understand the error handling and that did not prove to be a fruitful endeavor.

    So, your particular problem seems like it could be done with a transform stream:

    const fs = require('fs');
    const { Transform } = require('stream');
    
    const myTransform = new Transform({
        transform: function(chunk, encoding, callback) {
            let str = chunk.toString('utf8');
            this.push(str.toUpperCase());
            callback();
        }
    });
    
    function upperFile(input, output) {
        return new Promise((resolve, reject) => {
            // common function for cleaning up a partial output file
            function errCleanup(err) {
                fs.unlink(output, function(e) {
                    if (e) console.log(e);
                    reject(err);
                });
            }
    
            let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
            let outputStream = fs.createWriteStream(output, {emitClose: true});
    
            // have to separately listen for read/open errors
            inputStream.on("error", err => {
                // have to manually close writeStream when there was an error reading
                if (outputStream) outputStream.destroy();
                errCleanup(err);
            });
            inputStream.pipe(myTransform)
                .pipe(outputStream)
                .on("error", errCleanup)
                .on("close", resolve);        
        });
    }
    
    // sample usage
    upperFile("input.txt", "output.txt").then(() => {
        console.log("all done");
    }).catch(err => {
        console.log("got error", err);
    });
    

    As you can see, about 2/3 of this code is dealing with errors in a robust manner (the part that the built-in operations don't do properly).