Search code examples
node.jsstringstreamreadable

Node.js stops when reading from multiple Readable streams


After creating a stream (A), creating another stream (B) and reading stream (B), the reading process stops from the stream (A). How can I solve this problem?

Node.js v14.18.1

import * as readline from 'readline';
import { Readable } from 'stream';

async function  main() {

    const  streamA = Readable.from('a');
    const  readerA = readline.createInterface({
        input: streamA,
        crlfDelay: Infinity
    });

    var  stopCase = false;
    if (stopCase) {

        const  streamB = Readable.from('b');
        const  readerB = readline.createInterface({
            input: streamB,
            crlfDelay: Infinity
        });

        console.log('readB');
        for await (const line of readerB) {
            console.log(line);
        }
    }
    console.log(`readerA.closed = ${'closed' in readerA}`);

    console.log('readA');
    for await (const line of readerA) {
        console.log(line);
    }
    console.log('success');
}
main();

Output(stopCase=true):

readB
b
readerA.closed = true
readA

Output(stopCase=false):

readerA.closed = false
readA
a
success

Solution

  • The issue is that as soon as you do this:

    const readerA = readline.createInterface({
        input: streamA,
        crlfDelay: Infinity
    });
    

    Then, streamA is now ready to flow and readerA is ready to generate events as soon as you hit the event loop. When you go into the stopCase block and hit the for await (const line of readerB), that will allow streamA to flow which will allow readerA to fire events.

    But, you aren't listening for the readerA events when they fire and thus it finishes the streamA content it had while you aren't listening.

    You can see how it works better if you don't create readerA until after you're done with the stopCase block. Because then streamA and readerA aren't yet flowing when you hit the await inside of the stopCase block.

    This is what I would call a growing pain caused by trying to add promises onto the event driven streams. If you leave the stream in a flowing state and you were going to use await to read those events, but you then await some other promise, all your events on that first stream fire when you aren't yet listening. It doesn't know you're waiting to use await on it. You set it up to flow so as soon as the interpreter hits the event loop, it starts flowing, even though you aren't listening with await.

    I've run into this before in my own code and the solution is to not set a stream up to flow until you're either just about to use await to read it or until you have a more traditional event handler configured to listen to any events that flow. Basically, you can't configure two streams for use with for await (...) at the same time. Configure one stream, use it with your for await (...), then configure the other. And, be aware of any other promises used in your processing of the for await (...) loop too. There are lots of ways to goof up when using that structure.

    In my opinion, it would work more reliably if a stream was actually put in a different state to be used with promises so it will ONLY flow via the promise interface. Then, this kind of thing would not happen. But, I'm sure there are many challenges with that implementation too.

    For example, if you do this:

    import * as readline from 'readline';
    import { Readable } from 'stream';
    
    async function main() {
        var stopCase = true;
        console.log(`stopCase = ${stopCase}`);
        if (stopCase) {
    
            const streamB = Readable.from('b');
            const readerB = readline.createInterface({
                input: streamB,
                crlfDelay: Infinity
            });
    
            console.log('readB');
            for await (const line of readerB) {
                console.log(line);
            }
        }
        const streamA = Readable.from('a');
        const readerA = readline.createInterface({
            input: streamA,
            crlfDelay: Infinity
        });
        console.log(`streamA flowing = ${streamA.readableFlowing}`);
        console.log(`readerA.closed = ${!!readerA.closed}`);
    
        console.log('readA');
        for await (const line of readerA) {
            console.log(line);
        }
        console.log('success');
    }
    main();
    

    Then, you get all the output:

    stopCase = true
    readB
    b
    streamA flowing = true
    readerA.closed = false
    readA
    a
    success
    

    The reason you never get the console.log('success') is probably because you hit the for await (const line of readerA) { ...} loop and it gets stopped there on a promise that has no more data. Meanwhile, nodejs notices that there is nothing left in the process that can create any future events so it exits the process.

    You can see that same concept in play in an even simpler app:

    async function main() {
        await new Promise(resolve => {
            // do nothing
        });
        console.log('success');
    }
    main();
    

    It awaits a promise that never completes and there are no event creating things left in the app so nodejs just shuts down with ever logging success.