I have a function in which I read CSV file as a readable stream using the "pipeline" method, splitting it by rows and transforming the data of each row, then I add the data to an array. When the pipeline is finished, I insert all the data to a database.
This is the relevant part of the code:
pipeline(storageStream as Readable, split(), this.FilterPipe(), this.MapData(result));
public MapData(result: Array<string>): MapStream {
return mapSync((filteredData: string) => {
const trimmed: string = filteredData.trim();
if (trimmed.length !== 0) {
result.push(trimmed);
}
});
}
We have encountered sometimes with memory limits since we uploaded a big amount of very large CSV files, so we have decided to try to split the logic into insertion batches so we won't use a lot of memory at the same time.
So I thought to handle the readed data by batches, in which per every batch (let's say 100 rows in the file), I will trigger the "MapData" function and insert the result array to the DB.
Is there any option to add a condition so the MapData will be triggered every X rows? Or, if there is any other solution that might meet the requirement?
Thanks in advance!
The following code shows a transform stream that buffers incoming objects (or arrays of objects) until it has 100 of them and then pushes them onwards as an array:
var t = new stream.Transform({
objectMode: true,
transform(chunk, encoding, callback) {
this.buffer = (this.buffer || []).concat(chunk);
if (this.buffer.length >= 100) {
this.push(this.buffer);
this.buffer = [];
}
callback();
},
flush(callback) {
if (this.buffer.length > 0) this.push(this.buffer);
callback();
}
}).on("data", console.log);
for (var i = 0; i < 250; i++) t.write(i);
t.end();
You can include such a transform stream in your pipeline
.
And here's the same in Typescript. It can very probably be done more elegantly, but I am no Typescript expert.
class MyTransform extends Transform {
buffer: Array<any>;
}
var t = new MyTransform({
objectMode: true,
transform(chunk, encoding, callback) {
var that = this as MyTransform;
that.buffer = (that.buffer || []).concat(chunk);
if (that.buffer.length >= 100) {
this.push(that.buffer);
that.buffer = [];
}
callback();
},
flush(callback) {
var that = this as MyTransform;
if (that.buffer.length > 0) this.push(that.buffer);
callback();
}
}).on("data", console.log);
for (var i = 0; i < 250; i++) t.write(i);
t.end();