Search code examples
node.jstypescriptpipelineevent-stream

How to write batches of data in NodeJS stream pipeline?


I have a function in which I read CSV file as a readable stream using the "pipeline" method, splitting it by rows and transforming the data of each row, then I add the data to an array. When the pipeline is finished, I insert all the data to a database.

This is the relevant part of the code:

pipeline(storageStream as Readable, split(), this.FilterPipe(), this.MapData(result));

public MapData(result: Array<string>): MapStream {
    return mapSync((filteredData: string) => {
      const trimmed: string = filteredData.trim();
      if (trimmed.length !== 0) {
        result.push(trimmed);
      }
    });
}

We have encountered sometimes with memory limits since we uploaded a big amount of very large CSV files, so we have decided to try to split the logic into insertion batches so we won't use a lot of memory at the same time.

So I thought to handle the readed data by batches, in which per every batch (let's say 100 rows in the file), I will trigger the "MapData" function and insert the result array to the DB.

Is there any option to add a condition so the MapData will be triggered every X rows? Or, if there is any other solution that might meet the requirement?

Thanks in advance!


Solution

  • The following code shows a transform stream that buffers incoming objects (or arrays of objects) until it has 100 of them and then pushes them onwards as an array:

    var t = new stream.Transform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        this.buffer = (this.buffer || []).concat(chunk);
        if (this.buffer.length >= 100) {
          this.push(this.buffer);
          this.buffer = [];
        }
        callback();
      },
      flush(callback) {
        if (this.buffer.length > 0) this.push(this.buffer);
        callback();
      }
    }).on("data", console.log);
    for (var i = 0; i < 250; i++) t.write(i);
    t.end();
    

    You can include such a transform stream in your pipeline.

    And here's the same in Typescript. It can very probably be done more elegantly, but I am no Typescript expert.

    class MyTransform extends Transform {
      buffer: Array<any>;
    }
    var t = new MyTransform({
      objectMode: true,
      transform(chunk, encoding, callback) {
        var that = this as MyTransform;
        that.buffer = (that.buffer || []).concat(chunk);
        if (that.buffer.length >= 100) {
          this.push(that.buffer);
          that.buffer = [];
        }
        callback();
      },
      flush(callback) {
        var that = this as MyTransform;
        if (that.buffer.length > 0) this.push(that.buffer);
        callback();
      }
    }).on("data", console.log);
    for (var i = 0; i < 250; i++) t.write(i);
    t.end();