I have Readable
stream which provides objects to another Transform
stream (which is CsvFormatterStream<I, O>
from @fast-csv/format
npm package). Then the csv stream is piped to fs Writeable
stream to write CSV file. Please see the example at the bottom of the post.
I'm looking for a solution to split the objects from Readable
stream into chunks of size n
, so it will produce CSV files with max data rows count = n
.
For example:
n
= 10,It should produce 4 CSV files: 3 CSV files with rows count = 10 and 1 CSV file with rows count = 2.
How can I achive this?
UPD. I would like to have a custom stream with configurable n
which will receive objects from another stream and produce y
readable streams, where y
number of chunks of max size n
I've already implemented it and will post it later when SO will allow me to do this.
Prerequisites:
const { Readable } = require('node:stream');
const {format} = require('@fast-csv/format');
const fs = require('fs');
const path = require('path');
const { pipeline } = require('node:stream/promises');
(async () => {
try {
const readable = new Readable({ objectMode: true });
const csvStream = format({ headers: true });
const fileStream = fs.createWriteStream(path.join(process.cwd(), 'test.csv'), {
flags: 'w',
});
const objects = createObjects(32);
objects.forEach(obj => readable.push(obj));
readable.push(null);
await pipeline(readable, csvStream, fileStream);
console.log('Success!');
} catch (err) {
console.error(err);
}
})();
function createObjects(n) {
const objects = [];
for (let i = 0; i < n; i++) {
const obj = createObject(i);
objects.push(obj);
}
return objects;
}
function createObject(i) {
return {
id: i,
name: `Obj #${i}`,
};
}
The splitting can be done by implementing the Transform
stream which will accumulate chunks and release them as another stream. Then we can iterate through the results using asynchronous iterator.
SplitterStream class:
class SplitterStream extends Transform {
constructor(limit, opts) {
super({readableObjectMode: true, writableObjectMode: true});
this.limit = limit;
this.opts = opts;
this.pushedCount = 0;
this.currStream = new PassThrough(this.opts);
}
_transform(row, _encoding, callback) {
this.currStream.write(row);
this.pushedCount++;
if (this.pushedCount === this.limit) {
this.currStream.end();
this.push(this.currStream);
this.currStream = new PassThrough(this.opts);
this.pushedCount = 0;
}
callback();
}
_flush(callback) {
// If last rows were not pushed - push them before the end
if (this.pushedCount > 0) {
this.push(this.currStream);
this.currStream = null;
this.pushedCount = 0;
}
callback();
}
}
Example of usage. It produces 4 files as expected
const { Readable, Transform, PassThrough, Writable } = require('node:stream');
const {format} = require('@fast-csv/format');
const fs = require('fs');
const path = require('path');
const { pipeline } = require('node:stream/promises');
class SplitterStream extends Transform {
constructor(limit, opts) {
super({readableObjectMode: true, writableObjectMode: true});
this.limit = limit;
this.opts = opts;
this.pushedCount = 0;
this.currStream = new PassThrough(this.opts);
}
_transform(row, _encoding, callback) {
this.currStream.write(row);
this.pushedCount++;
if (this.pushedCount === this.limit) {
this.currStream.end();
this.push(this.currStream);
this.currStream = new PassThrough(this.opts);
this.pushedCount = 0;
}
callback();
}
_flush(callback) {
// If last rows were not pushed - push them before the end
if (this.pushedCount > 0) {
this.push(this.currStream);
this.currStream = null;
this.pushedCount = 0;
}
callback();
}
}
(async () => {
try {
// Create and fill readable
const readable = new Readable({ objectMode: true });
const splitterStream = new SplitterStream(15, {readableObjectMode: true, writableObjectMode: true});
const objects = createObjects(32);
objects.forEach(obj => readable.push(obj));
readable.push(null);
readable.pipe(splitterStream);
let i = 1;
for await (const _readable of splitterStream) {
const csvStream = format({ headers: true });
const fileStream = fs.createWriteStream(path.join(process.cwd(), `test_${i++}.csv`), {
flags: 'w',
});
await pipeline(_readable, csvStream, fileStream);
}
console.log('Success!');
} catch (err) {
console.error(err);
}
})();
function createObjects(n) {
const objects = [];
for (let i = 0; i < n; i++) {
const obj = createObject(i);
objects.push(obj);
}
return objects;
}
function createObject(i) {
return {
id: i,
name: `Obj #${i}`,
};
}