I have a Readable
stream in object mode that I'm pushing data in, like this
const getReadStream = () => {
const stream = new Readable({ objectMode: true, read: () => {} });
const get = async (page = 1) => {
const { data } = await client
.request(...)
.catch((error) => {
stream.emit('error', error);
return { data: undefined };
});
const results = parseFn(data);
if (results.length > 0) {
results.forEach((row) => stream.push(row));
get(page + 1);
} else {
stream.push(null);
}
};
get();
return stream;
};
I would want to consume it into some Writable
stream, like this
const consumeStream = async () => {
const readableStream = getReadStream();
const pipeline1 = pipeline(
stream,
transform1,
transform2,
someWritableStream,
);
if (!certainCondition) {
return pipeline1;
}
const writeStream2 = new Writable({
objectMode: true,
write: (rows: any[], _, callback) => {
somePromise(rows).then(() => callback());
},
});
const pipeline2 = pipeline(
stream,
transform3,
transform4,
writeStream2,
);
return Promise.all([pipeline1, pipeline2]);
};
My question is, in the case where consumeStream
proceed with pipeline2
, is it the correct way to implement multiple writable stream with only 1 readable stream (as the number of objects processed by pipeline1
= the number of objects processed by pipeline2
. And if it is not, what is the correct way to implement this?
You're going to have issues with the original stream data being already in progress or potentially ended by the time you start your second pipeline.
If you use a PassThrough stream to duplicate data coming off of the original, then your method will work.
Here's a little script that demonstrates
#!/usr/bin/node
const { promises: fs,
createReadStream,
createWriteStream } = require('node:fs'),
{ setTimeout } = require('node:timers/promises'),
{ PassThrough,
promises: { pipeline } } = require('node:stream'),
assert = require('node:assert');
const srcFile = '/tmp/foobar',
sinkFile1 = '/tmp/foobarSink1',
sinkFile2 = '/tmp/foobarSink2';
async function mkSrc() {
for(let i =0; i< 1000; i++) {
if(i)
await fs.appendFile(srcFile,'foobar\n');
else
await fs.writeFile(srcFile,'foobar\n');
}
}
function getReadableStream() {
return createReadStream(srcFile);
}
function getWritableStream(file) {
return createWriteStream(file);
}
async function chkIt() {
let chk1 = await fs.readFile(sinkFile1,'utf8'),
chk2 = await fs.readFile(sinkFile2,'utf8');
assert.equal( chk1, chk2);
}
async function test1() {
await mkSrc();
let stream = getReadableStream();
let prom1 = pipeline( stream, getWritableStream(sinkFile1));
await setTimeout(1000);
let prom2 = pipeline(stream, getWritableStream(sinkFile2));
await Promise.all([prom1,prom2]);
try {
await chkIt();
console.log('Not using passthrough results in same stream data');
}
catch(err) { console.error('Not using passthrough results in different stream data') }
}
async function test2() {
await mkSrc();
let stream = getReadableStream();
let passthrough = new PassThrough();
stream.pipe(passthrough);
let prom1 = pipeline( stream, getWritableStream(sinkFile1));
await setTimeout(1000);
let prom2 = pipeline(passthrough, getWritableStream(sinkFile2));
await Promise.all([prom1,prom2]);
try {
await chkIt();
console.log('Using passthrough results in same stream data');
}
catch(err) { console.error('Using passthrough results in different stream data') }
}
async function main() {
await test1();
await test2();
}
main();
The results printed are
Not using passthrough results in different stream data
Using passthrough results in same stream data
So in your case you would do something like
let stream = getStreamSomehow(),
passthrough;
if(conditionChk) {
passthrough = new Passthrough();
stream.pipe(passthrough);
}
let pipeline1 = pipeline(stream,...);
if(!conditionChk)
return pipeline1;
let pipeline2 = pipeline(passthrough,...);
doOtherStuff();
return Promise.all([pipeline1,pipeline2]);