I am attempting to combine n
binary files into a single file in javascript using streams. I have a write stream that is passed to the following function. I notice that the total written bytes does not match the actual number of bytes in the file, and is also not consistent across multiple runs.
After reading the documentation, I noticed that the write
call returns a promise and is not safe to be called again until the previous promise is fulfilled. I am not sure how to make readStream.on('data', function (chunk)
use await
, as the function is not async and I get an error await is only valid in async function
async function concatFile (filename, fileHandle) {
return new Promise((resolve, reject) => {
const readStream = fs.createReadStream(filename, { highWaterMark: 1024 })
readStream.on('data', function (chunk) {
// read
fileHandle.write(chunk)
})
readStream.on('error', e => {
reject(e)
})
readStream.on('close', function (err) {
// close
})
readStream.on('end', function () {
// done
readStream.close()
resolve()
})
}) // end of Promise
}
I am using the above function in the following snippet:
const fileWriter = fs.createWriteStream('concatBins.bin', { flags: 'w' })
let writtenLen = 0
fileList = {}
fileList[0] = "foo.bin"
fileList[1] = "bar.bin"
for (const [key, value] of Object.entries(fileList)) {
await concatFile(value, fileWriter)
writtenLen = fileWriter.bytesWritten
console.log('bytes written ' + writtenLen)
}
You can pause the readStream until the write is done to avoid getting future data
events and the resume it when done with the write. And, you can declare the .on('data', ...)
callback to be async
if you want to use await
. But, you do have to pause the readStream because the async/await won't pause it for you.
// stream write that returns a promise when OK to proceed
// with more writes
function write(stream, data) {
return new Promise((resolve, reject) => {
if (stream.write(data)) {
resolve();
} else {
// need to wait for drain event
stream.once('drain', resolve);
}
});
}
async function concatFile (filename, writeStream) {
return new Promise((resolve, reject) => {
const readStream = fs.createReadStream(filename, { highWaterMark: 1024 });
let paused = false;
let ended = false;
readStream.on('data', async function(chunk) {
// read
try {
readStream.pause();
paused = true;
await write(writeStream, chunk);
} catch(e) {
// have to decide what you're doing if you get a write error here
reject(e);
} finally {
paused = false;
readStream.resume();
if (ended) {
readStream.emit("finalEnd");
}
}
});
readStream.on('error', e => {
reject(e)
})
readStream.on('close', function (err) {
// close
})
readStream.on('end', function () {
// done
ended = true;
if (!paused) {
readStream.emit('finalEnd');
}
});
// listen for our real end event
readStream.on('finalEnd', () {
readStream.close();
resolve()
});
}) // end of Promise
}