I'm trying to correctly read and write 1 million lines from the pile corpora with node.js. The problem is that I get an output file with one million lines but the 1 millionth line is the 900kish line of the original file.
This what I used:
fs =require("fs");
var buffer = '';
var rs1 = fs.createReadStream('00.jsonl',"ascii");
var ws = fs.createWriteStream('pile_1M.txt', {flags: 'w',encoding: 'ascii'});
var lines_num=0;
var max_read=1000000;
rs1.on('data', function(chunk) {
rs1.pause()
var lines = (buffer+chunk).split(/\r?\n/g);
for (var i = 0; i < lines.length && lines_num<max_read; ++i) {
lines_num++;
ws.write(lines[i]+"\n");}
}
if(lines_num>=max_read){
rs1.close()
console.log("close")
}
rs1.resume();
});
rs1.on('close', function () {
ws.close();
});
EDIT:
Thanks to @jfriend00 I came up with the following working solution:
fs =require("fs");
var buffer = '';
var rs1 = fs.createReadStream('/media/user/hdd2/pile/00.jsonl',"utf-8");
var ws = fs.createWriteStream('pile_1M_2.txt', {flags: 'w',encoding: 'utf-8'});
var lines_num=0;
var max_read=100000;
function json_line_to_text(line){
var index=line.indexOf(', "meta": {"');
var res=line.substring(10,index-1);
if(index==-1){
res=line;
}
return res;
}
rs1.on('data', function(chunk) {
rs1.pause();
var lines = (buffer+chunk).split(/\r?\n/g);
for (var i = 0; i < lines.length && lines_num < max_read; ++i) {
if (i == lines.length - 1) {
buffer = lines[i];
} else {
if (lines_num < max_read) {
ws.write(json_line_to_text(lines[i]) + "\n");
lines_num++;
}
}
}
if(lines_num>=max_read){
rs1.close()
console.log("close")
}
rs1.resume();
});
rs1.on('close', function () {
ws.close();
});
**I included the jsonline_to_text function which I initially thought was the true reason for the problem which it is not.
Your reads are not necessarily whole lines, but your code appears to be assuming they are. They will most likely not end at the end of a line.
So, you do this:
var lines = (buffer+chunk).split(/\r?\n/g);
And, then you count those as whole lines, but the last one in the split is not necessarily a whole line.
And, then you do this:
ws.write(lines[i]+"\n");
Where you add a \n
to the end of every split piece, but the last one you split may NOT have been the end of a line. So, you end up breaking chunks into new lines at places they didn't have a linebreak character originally.
The usual way to handle this is you save the last piece of the split and don't count it as a line and don't write it out. Save it to be prepended to the next read. That way you're only writing out a \n
where there really was a line boundary.
And, you also probably have split issues if the line boundary is \r\n
, but the readstream only reads the \r
. Then, you don't handle that properly. This issue would also be fixed by just not processing the last part of the split and saving it to be combined with the next read. Then, you know you always have the whole line ending before processing it.
Plus, I don't see where in this code you ever reset buffer
to exclude the data you've already processed and written out. In fact, this code doesn't ever seem to put any lines in the buffer
variable. You should be using it as described earlier to contain the last partial line.
On a separate topic, you should also be observing flow control on this:
ws.write(lines[i]+"\n");
.write()
returns a boolean that tells you if the buffering is full and you should wait for a drain
event before further writing. It's all explained in the doc here.