Search code examples
javascriptnode.jsnode-streams

Correctly use fs write inside createReadStream on data


I am attempting to combine n binary files into a single file in javascript using streams. I have a write stream that is passed to the following function. I notice that the total written bytes does not match the actual number of bytes in the file, and is also not consistent across multiple runs.

After reading the documentation, I noticed that the write call returns a promise and is not safe to be called again until the previous promise is fulfilled. I am not sure how to make readStream.on('data', function (chunk) use await, as the function is not async and I get an error await is only valid in async function

async function concatFile (filename, fileHandle) {
  return new Promise((resolve, reject) => {
    const readStream = fs.createReadStream(filename, { highWaterMark: 1024 })

    readStream.on('data', function (chunk) {
      // read
      fileHandle.write(chunk)
    })
    readStream.on('error', e => {
      reject(e)
    })
    readStream.on('close', function (err) {
      // close
    })
    readStream.on('end', function () {
      // done
      readStream.close()
      resolve()
    })
  }) // end of Promise
}

I am using the above function in the following snippet:

  const fileWriter = fs.createWriteStream('concatBins.bin', { flags: 'w' })
  let writtenLen = 0
  fileList = {}
  fileList[0] = "foo.bin"
  fileList[1] = "bar.bin"
  for (const [key, value] of Object.entries(fileList)) {
    await concatFile(value, fileWriter)
    writtenLen = fileWriter.bytesWritten
    console.log('bytes written ' + writtenLen)
  }

Solution

  • You can pause the readStream until the write is done to avoid getting future data events and the resume it when done with the write. And, you can declare the .on('data', ...) callback to be async if you want to use await. But, you do have to pause the readStream because the async/await won't pause it for you.

    // stream write that returns a promise when OK to proceed
    // with more writes
    function write(stream, data) {
        return new Promise((resolve, reject) => {
             if (stream.write(data)) {
                 resolve();
             } else {
                 // need to wait for drain event
                 stream.once('drain', resolve);
             }
             
        });
    }
    
    async function concatFile (filename, writeStream) {
      return new Promise((resolve, reject) => {
        const readStream = fs.createReadStream(filename, { highWaterMark: 1024 });
        let paused = false;
        let ended = false;
    
        readStream.on('data', async function(chunk) {
          // read
          try {
            readStream.pause();
            paused = true;
            await write(writeStream, chunk);
          } catch(e) {
            // have to decide what you're doing if you get a write  error here
            reject(e);
          } finally {
            paused = false;
            readStream.resume();
            if (ended) {
                readStream.emit("finalEnd");
            }
          }
        });
        readStream.on('error', e => {
          reject(e)
        })
        readStream.on('close', function (err) {
          // close
        })
        readStream.on('end', function () {
          // done
          ended = true;
          if (!paused) {
             readStream.emit('finalEnd');
          }
        });
        // listen for our real end event
        readStream.on('finalEnd', () {
           readStream.close();
           resolve()
        });
      }) // end of Promise
    }