Search code examples
node.jsmultermulter-s3

Cloning a Node File Object to use multiple streams in parallel (Multer)


Is it possible to clone a Node.JS File object?

I've written a custom storage driver for Multer which takes an array of storage drivers in it's constructor and calls ._handleFile of each driver. The goal is to save one file to multiple destinations in parralel.

However, it seems that the file stream that's opened by the disk driver messes up any subsequent reads. In my particular case I'm trying to save to a local disk + AWS-S3.

Through debugging (setTimeouts, etc.) I found out that:

  • If the file gets uploaded to S3 first, the file written to my local disk is empty.
  • If the file gets written my local disk first the S3 upload simply dies without any errors

So my assumption is that multiple streams on the same file cause strange issues.

The multer disk driver does the following:

  ...
  var outStream = fs.createWriteStream(finalPath)
  file.stream.pipe(outStream)

The multer AWS S3 driver does this:

...
var upload = this.s3.upload(params)

I assume the library opens a stream.

I don't want to save the file first and manually create two streams after. I'd prefer to somehow duplicate the file object and send them off to each individual ._handleFile method.

 MultiStorage.prototype._handleFile = async function _handleFile (req, file, cb) {
 
   // I removed some code for this example
   ...
   const results = await Promise.all(drivers.map({ driver }, i) => {
     return new Promise((fulfill, reject) => {
       // file -> this I believe I need to duplicate
       driver._handleFile(req, file, (error, info) => {
         fulfill({ info, error })
       })
     })
     ....

Solution

  • Answering my own questions

    I wrote little helper which creates new PassThrough streams and then writes to them as data comes in.

    const { PassThrough } = require('stream');
    
    // Split stream into $count amount of new streams and return them
    const splitStream = (stream, count) => {
      const streams = [...Array(count)].map(() => new PassThrough());
      stream.on('data', chunk => {
        streams.map(s => s.push(chunk));
      })
      stream.on('end', chunk => {
        streams.on('end', () => {
          streams.map(s => s.push(null));
        })
      })
     return streams;
    }
    

    Now you just need to pass on your new stream(s) instead of the original stream.

    myFn(streams[0]);
    myFn(streams[1]);
    

    Disclaimer: This method does not take care of error handling and can cause memory leaks. You might want to consider using the Pipeline() wrapper from the 'stream' lib.