I need to read a logfile with thousands of lines and write each line to a Mongo database. I am reading the file using a node stream. I am splitting the file into 'lines' using the 'split' npm package. The MongoDB write will take a lot longer than the logfile read due to network considerations.
My core code looks like this:
var readableStream = fs.createReadStream(filename);
readableStream
.pipe(split()) // This splits the data into 'lines'
.on('data', function (chunk) {
chunkCount++;
slowAsyncFunctionToWriteLogEntryToDatabase(chunk); // This will take ages
})
.on('end', function () {
// resolve the promise which bounds this process
defer.resolve({v:3,chunkCount: chunkCount})
});
Do I need to worry that the MongoDB system will be hammered by the number of writes being queued? Presumably the node pipe back-pressure mechanism won't know that lots of db writes are being queued? Is there any way to 'slow' the readable stream so that it waits for each MongoDB insert to finish before it reads the next line from the logfile? Am I worrying unnecessarily?
Since working with pause()
and resume()
seems to have some problems. I will write another option, which is using Transform stream.
var Transform = require('stream').Transform;
var myTransform = new Transform({
transform(chunk, encoding, cb) {
chunkCount++;
syncFunctionToWriteLogEntryWithCallback( chunk, function() {
cb();
} );
},
flush(cb) {
chunkCount++;
syncFunctionToWriteLogEntryWithCallback( chunk, function() {
cb();
} );
}
});
readableStream
.pipe( split() )
.pipe( myTransform );
Using transform streams allow you to supply a callback, whenever you have finished processing the stream.