Search code examples
node.jsnode.js-stream

Using `pipeline` from `node:stream/promise` for multiple writable sources


I have a Readable stream in object mode that I'm pushing data in, like this

const getReadStream = () => {
    const stream = new Readable({ objectMode: true, read: () => {} });

    const get = async (page = 1) => {
       const { data } = await client
           .request(...)
           .catch((error) => {
               stream.emit('error', error);
               return { data: undefined };
            });
    
       const results = parseFn(data);
    
       if (results.length > 0) {
           results.forEach((row) => stream.push(row));
           get(page + 1);
       } else {
           stream.push(null);
       }
    };
    
    get();

    return stream;
};

I would want to consume it into some Writable stream, like this

const consumeStream = async () => {

    const readableStream = getReadStream();

    const pipeline1 = pipeline(
        stream,
        transform1,
        transform2,
        someWritableStream,
    );

    if (!certainCondition) {
        return pipeline1;
    }

    const writeStream2 = new Writable({
        objectMode: true,
        write: (rows: any[], _, callback) => {
            somePromise(rows).then(() => callback());
        },
    });

    const pipeline2 = pipeline(
        stream,
        transform3,
        transform4,
        writeStream2,
    );

    return Promise.all([pipeline1, pipeline2]);
};

My question is, in the case where consumeStream proceed with pipeline2 , is it the correct way to implement multiple writable stream with only 1 readable stream (as the number of objects processed by pipeline1 = the number of objects processed by pipeline2. And if it is not, what is the correct way to implement this?


Solution

  • You're going to have issues with the original stream data being already in progress or potentially ended by the time you start your second pipeline.

    If you use a PassThrough stream to duplicate data coming off of the original, then your method will work.

    Here's a little script that demonstrates

    #!/usr/bin/node
    
    const { promises: fs,
            createReadStream,
            createWriteStream } = require('node:fs'),
                 { setTimeout } = require('node:timers/promises'),
     { PassThrough,
       promises: { pipeline } } = require('node:stream'),
                         assert = require('node:assert');
    
    const srcFile = '/tmp/foobar',
          sinkFile1 = '/tmp/foobarSink1',
          sinkFile2 = '/tmp/foobarSink2';
    
    async function mkSrc() {
      for(let i =0; i< 1000; i++) {
        if(i)
          await fs.appendFile(srcFile,'foobar\n');
        else
          await fs.writeFile(srcFile,'foobar\n');
      }
    }
    
    function getReadableStream() {
      return createReadStream(srcFile);
    }
    
    function getWritableStream(file) {
      return createWriteStream(file);
    }
    
    async function chkIt() {
      let chk1 = await fs.readFile(sinkFile1,'utf8'),
          chk2 = await fs.readFile(sinkFile2,'utf8');
      assert.equal( chk1, chk2);
    }
    
    async function test1() {
      await mkSrc();
      let stream = getReadableStream();
      let prom1 = pipeline( stream, getWritableStream(sinkFile1));
      await setTimeout(1000);
      let prom2 =  pipeline(stream, getWritableStream(sinkFile2));
      await Promise.all([prom1,prom2]);
      try { 
        await chkIt();
        console.log('Not using passthrough results in same stream data');
      }
      catch(err) { console.error('Not using passthrough results in different stream data') }
    }
    
    async function test2() {
      await mkSrc();
      let stream = getReadableStream();
      let passthrough = new PassThrough();
      stream.pipe(passthrough);
      let prom1 = pipeline( stream, getWritableStream(sinkFile1));
      await setTimeout(1000);
      let prom2 = pipeline(passthrough, getWritableStream(sinkFile2));
      await Promise.all([prom1,prom2]);
      try { 
        await chkIt();
        console.log('Using passthrough results in same stream data');
      }
      catch(err) { console.error('Using passthrough results in different stream data') }
    }
    
    
    async function main() {
      await test1();
      await test2();
    }
    main();
    
    

    The results printed are

    Not using passthrough results in different stream data
    Using passthrough results in same stream data
    

    So in your case you would do something like

    let stream = getStreamSomehow(),
        passthrough;
    if(conditionChk) {
      passthrough = new Passthrough();
      stream.pipe(passthrough);
    }
    let pipeline1 = pipeline(stream,...);
    if(!conditionChk)
      return pipeline1;
    let pipeline2 = pipeline(passthrough,...);
    doOtherStuff();
    return Promise.all([pipeline1,pipeline2]);