In my code I have a nested fs.createReadStream(...)
being called within the readline.on("line"...
event. Initially I had the program failing outright because over 1000 read streams would be created by fs, and the failsafe to stop would trigger.
I implemented a check before opening the read stream to see how many files were open, and if more than I wanted were open I would stall until they were closed. However because the code is called asynchronously it seems over 600 of the checks pass and open files before the other tasks begin stalling. I am having trouble figuring out a way to limit the number of read streams, any solutions would be greatly appreciated.
The purpose of the script overall is to compare two very large files against each other line by line. Each file has well over 200k lines.
I have excluded the functions formatLine
and checkLine
as well as refactored my code to try and keep the content relevant to the question.
var fs = require( 'fs' );
const readline = require('readline');
const files = {open: 0};
const wait = ms => new Promise(r => setTimeout(r, ms));
//for debugging purposes, just looking at how many files are open
setInterval(() => console.log(files.open), 500);
async function stallIfNeeded() {
while (files.open > 50) {
await wait(500);
}
}
async function checkLineExists(line) {
await stallIfNeeded();
let readStream = fs.createReadStream( "./largeFileOfUncertianValues.csv" );
files.open = files.open + 1;
const rl = readline.createInterface({
input: readStream,
crlfDelay: Infinity
});
const hasLine = await new Promise(async res => {
let found = false;
rl.on( 'close', () => {
files.open = files.open - 1;
res(found);
});
rl.on("line", oline => {
const hasLine = oline.includes(line);
if (hasLine) {
found = true;
rl.close();
}
});
});
return hasLine;
}
(async () => {
const sourceStream = fs.createReadStream('largeFileOfKnownValues.csv');
const sourceRl = readline.createInterface({
input: sourceStream,
crlfDelay: Infinity
});
let writeStream = fs.createWriteStream("missing.csv")
let cntr = 0;
await new Promise(sRes => {
sourceRl.on("line", async line => {
//these are conditions left out for simplicity.
//I need to format every line and check to make sure it is
//something I want to search for.
const lineFormatted = formatLine(line);
const skipLine = checkLine(lineFormatted);
if (skipLine) return;
const exists = await checkLineExists(lineFormatted);
if (!exists) console.log("doesn't exists");
if (!exists) writeStream.write( line + "\n" );
if (++cntr % 50 == 0) {
console.log("another 50 done");
console.log(cntr + " so far");
}
});
sourceRl.on("close", sRes);
});
})();
If you really want to minimize memory usage and/or support arbitrarily large files with decent performance, then you really should load one of the files into some sort of database that can do indexed lookup on the data. Then, you can cycle through one file line by line, normalize the line and then run a query to see if it's present in the other data.
Absent that, here are a couple approaches without a database. The first one, loads one set of data into an in-memory Set object and then cycles through the second one line by line checking to see what is in the Set (essentially an in-memory database lookup).
const fs = require('fs');
const readline = require('readline');
function normalizeLine(line) {
return line.toLowerCase();
}
async function compare(sourceFile, targetFile) {
// read in all source lines into a normalized Set
const source = readline.createInterface({ input: fs.createReadStream(sourceFile) });
const sourceLines = new Set();
for await (const line of source) {
sourceLines.add(normalizeLine(line));
}
const notFounds = [];
const target = readline.createInterface({ input: fs.createReadStream(targetFile) });
for await (const line of target) {
if (!sourceLines.has(normalizeLine(line))) {
notFounds.push(line);
}
}
return notFounds;
}
compare("source.txt", "target.txt").then(result => {
if (!result.length) {
console.log("All target lines found in source");
} else {
console.log("Not found in source", result);
}
}).catch(err => {
console.log(err);
});
The second one, uses your approach of cycling through the first file line by line, then cycling through the second file line by line for each line of the first file. This will be excruciatingly slow for any large data set, but it will scale indefinitely for large files.
This uses the promise interface for readline to cycle through line by line and it awaits
the close event to avoid any build-up of open files.
const fs = require('fs');
const readline = require('readline');
function normalizeLine(line) {
return line.toLowerCase();
}
async function compare(sourceFile, targetFile) {
// read in all source lines into a normalized Set
const source = readline.createInterface({ input: fs.createReadStream(targetFile) });
const notFounds = [];
for await (const line of source) {
let found = await findLineInFile(sourceFile, line);
if (!found) {
notFounds.push(line);
}
}
return notFounds;
}
compare("source.txt", "target.txt").then(result => {
if (!result.length) {
console.log("All target lines found in source");
} else {
console.log("Not found in source", result);
}
}).catch(err => {
console.log(err);
});
function streamDestroy(stream) {
return new Promise((resolve, reject) => {
stream.once('close', resolve);
stream.once('error', reject);
stream.destroy();
});
}
async function findLineInFile(filename, targetLine) {
const lookLine = normalizeLine(targetLine);
const stream = fs.createReadStream(filename);
const source = readline.createInterface({ input: stream });
for await (const line of source) {
if (normalizeLine(line) === lookLine) {
await streamDestroy(stream);
return true;
}
}
return false;
}