Search code examples
javascriptnode.jstypescriptstreamnode-streams

Node.js async streams backpressure not working


I have two async processes, one that produces data and one data consumes data. They work at different rates, so my idea was to leverage Node.js streams to automatically handle backpressure between one stream and the other. I have tried to arrange a solution extending stream.Readable and stream.Writable classes, using a different delay in _read and _write implementations to simulate the different rate in the real world (the readable is faster than the writable in my case) and an async generator as the data source. Both the streams work in object mode and have a highWaterMark = 2, so I expected that the readable stream to adapt to the writable stream rate once the buffer is full, but this isn't happening. As shown in the output, readable stream keeps pushing data even if the highWaterMark is reached. What am I doing wrong?

import { Readable, Writable } from 'stream';

const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));

async function* asyncGenerator() {
    let i = 0;

    while (i++ < 10) {
        yield { data: i };
    }
}

class TestReadable extends Readable {
    public delay: number;
    private _tag = '[Readable]';
    private _generator = asyncGenerator();

    constructor(delay: number) {
        super({
            objectMode: true,
            highWaterMark: 2,
        });

        this.delay = delay;
    }

    async _read(size: number) {
        while (true) {
            await sleep(this.delay);
            const { value, done } = await this._generator.next();
            const bufferFull = this.push(value);
            console.log(this._tag, `Pushed ${JSON.stringify(value)}`, this.readableLength);

            if (done) {
                this.push(null);
                break;
            }

            if (bufferFull) {
                break;
            }
        }
    }
}

class TestWritable extends Writable {
    public delay: number;
    private _tag = '[Writable]';

    constructor(delay: number) {
        super({
            objectMode: true,
            highWaterMark: 2,
        });

        this.delay = delay;
    }

    async _write(chunk: any, encoding: BufferEncoding, callback: (error?: (Error | null)) => void) {
        await sleep(this.delay);
        console.log(this._tag, `Received ${JSON.stringify(chunk)}`);
        callback();
    }
}

(async() => {
    const readable = new TestReadable(1000);
    const writable = new TestWritable(3000);

    readable.pipe(writable);
})();
[Readable] Pushed {"data":1} 0
[Readable] Pushed {"data":2} 0
[Readable] Pushed {"data":3} 1
[Writable] Received {"data":1}
[Readable] Pushed {"data":4} 2
[Readable] Pushed {"data":5} 3
[Readable] Pushed {"data":6} 4
[Writable] Received {"data":2}
[Readable] Pushed {"data":7} 3
[Readable] Pushed {"data":8} 4
[Readable] Pushed {"data":9} 5
[Writable] Received {"data":3}
[Readable] Pushed {"data":10} 6
[Readable] Pushed undefined 7
[Writable] Received {"data":4}
[Writable] Received {"data":5}
[Writable] Received {"data":6}
[Writable] Received {"data":7}
[Writable] Received {"data":8}
[Writable] Received {"data":9}
[Writable] Received {"data":10}
[Writable] Received undefined

Process finished with exit code 0


Solution

  • Summary

    In an implementation of _read(), you can't do this sequence of events without some undesirable consequences:

    Asynchronous wait
    stream.push(data)
    Asynchronous wait
    stream.push(data)
    

    As soon as you return control back to the event loop after calling stream.push(data), the stream assumes your _read() operation is done and will call it again immediately if the buffer isn't full. The caller has no way to know that your first _read() operation is still gathering more data and will still do more .push() operations. Thus is the contract/API for ._read().

    So as soon as you call stream.push() and return control back to the event loop, your processing from that call to _read() should be done.

    You absolutely can do this in _read():

    Asynchronous wait
    stream.push(data)
    

    Or even this:

    Asynchronous wait
    stream.push(data)
    stream.push(data)
    stream.push(data)
    

    Just not this (which is what you were doing):

    Asynchronous wait
    stream.push(data)
    Asynchronous wait
    stream.push(data)
    

    Long Explanation

    The problem is caused by an intersection of how streams work and how your code is written. Too summarize, the issue is that you're calling .push() multiple times after asynchronous waits for each in the same _read().

    That isn't how the caller of _read() expects things to work. You are certainly allowed to go do some asynchronous operation and then call .push() one or more times in the same event cycle. But, things get messed up if you call .push() and then return control back to the event loop and then, on some later tick of the event loop, call .push() again from the same _read() call. And, that's what your code is doing.

    The design of _read() is such that you can take as much time as you want to call .push(), but as soon as you call .push() and return control back to the event loop, the stream sees that there's room in the stream buffer and you've already called .push() (so you must be done) so it calls _read() again. But, your previous _read() wasn't done yet. So, now you have multiple _read() loops running at the same time which surprisingly doesn't blow up, but ends up reading too much data and forces the buffer to grow larger than the highWaterMark because the bookkeeping around that assumes that once a call to _read() has called .push() and returned control back to the event loop, it must be done. But, yours wasn't done. There is no other way to communicate that your call to _read() isn't done yet.

    There are multiple different types of fixes.

    1. You can this.pause() until your _read() is really done and then this.resume(). That does fix the issue, but may have some other side effects on the processing of the stream.

    2. You can just read one object at a time and thus one .push() per _read(). As soon as you call the .push() and return control back, the stream will just call _read() again.

    3. You can buffer the objects you are collecting and then .push() them all into the stream in one synchronous loop.

    #2 would be the simplest to implement and in your implementation where you only get one object at a time anyway (always a delay between objects being available), it wouldn't have any downside to performance.

    This is an implementation below (in plain Javascript) of option #3 that uses local buffering, but all three fix options work.

    import { Readable, Writable } from 'stream';
    
    const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
    
    async function* asyncGenerator() {
        let i = 0;
    
        while (i++ < 10) {
            yield { data: i };
        }
    }
    
    class TestReadable extends Readable {
    
        constructor(delay) {
            super({
                objectMode: true,
                highWaterMark: 2,
            });
    
            this.delay = delay;
            this._tag = '[Readable]';
            this._generator = asyncGenerator();
        }
    
        async _read(size) {
            console.log(this._tag, `Starting read(${size})`);
            let readCnt = size;
            const buffer = [];
            while (readCnt > 0) {
                await sleep(this.delay);
                const { value, done } = await this._generator.next();
                if (done) {
                    this.push(null);
                    break;
                }
    
                buffer.push(value);
                --readCnt;
            }
            for (let value of buffer) {
                const bufferFull = !this.push(value);
                console.log(this._tag, `Pushed ${JSON.stringify(value)}, length = ${this._readableState.length}, full = ${bufferFull}`);
            }
            console.log(this._tag, `Ending read()`);
        }
    }
    
    class TestWritable extends Writable {
    
        constructor(delay) {
            super({
                objectMode: true,
                highWaterMark: 2,
            });
    
            this.delay = delay;
            this._tag = '[Writable]';
        }
    
        async _write(chunk, encoding, callback) {
            await sleep(this.delay);
            console.log(this._tag, `Received ${JSON.stringify(chunk)}`);
            callback();
        }
    }
    
    (async () => {
        const readable = new TestReadable(1000);
        const writable = new TestWritable(3000);
    
        readable.pipe(writable);
    })();
    

    Other fixes/changes to the code:

    1. Your bufferFull logic was backwards. this.push() returns true when there's more room in the buffer, not when it's full. Because my modified code pays attention to the size argument in _read(size), my code doesn't use that value any more.
    2. I've implemented controls to respect the size argument passed to _read() and not read or push more than that.
    3. When done is true in the call to await this._generator.next();, there is no value, but you were still pushing that undefined value and then later pushing null. The value when done is true is whatever the generator returns at the end. Your generator does not have a final return value and thus does not have a final value when done === true.

    Here's an implementation of option #2 (reading one object at a time):

    async _read(size) {
        console.log(this._tag, `Starting read(${size})`);
        await sleep(this.delay);
        const { value, done } = await this._generator.next();
        if (done) {
            console.log(this._tag, 'End of read data');
            this.push(null);
        } else {
            this.push(value);
            console.log(this._tag, `Pushed ${JSON.stringify(value)}, length = ${this._readableState.length}`);
        }
        console.log(this._tag, `Ending read()`);
    }
    

    You might think this is inefficient to be just pushing one object per call, but it actually isn't. Because as soon as you push an object and there's more room in the buffer, the stream will immediately schedule a check (for the next tick of the event loop) to call _read() again. So, your while loop inside the function has just been replaced by a little logic right in the caller that will just call _read() again. Not much difference.

    And, this implementation is by far the simplest of the three solutions I've outlined.