Search code examples
javascriptnode.jspipelinenode-streams

Nodejs stream behaviour, pipeline callback not called


Why does pipeline never call its callback? Also the transform function stops being called after 16 chunks.

eg:

const { Readable, Transform, pipeline } = require('stream');

const readable = new Readable({
    objectMode: true,
    read() {
        this.push(count++);
        if (count > 20) {
            this.push(null);
        }
    }
});
const transform = new Transform({
    objectMode: true,
    transform(chunk, encoding, callback) {
        data.push(chunk);
        console.log('transform - chunk: ', chunk.toString());
        callback(null, chunk);
    }
});

let count = 1, data = [];
pipeline(
    readable,
    transform,
    (error) => {
        if (error) console.log('pipeline callback - ERROR: ', error);
        else console.log('pipeline callback - data: ', data);
    }
);


Solution

  • That the transform function is not called again following the 16th call is a hint. The reason is given here: https://nodejs.org/api/stream.html#stream_new_stream_writable_options

    In objectMode the transform has a default highWaterMark of 16. Because the transform function is also "pushing" data to it's own readable via the callback, this is causing that readable buffer to become full (as there is no further writable to consume that readables data). And so the stream is pausing due to backpressure that starts from the transform-readable portion of the stream and "flows" up to the transform-writable side and then on up to the original readable stream which is finally paused.

    • transform-readable - pauses after 16 chunks pushed
    • transform-writable - keeps accepting chunks and buffering to the next 16 chunks from the upstream readable
    • readable - keeps reading until its buffer is full, the next 16 chunks, then pauses

    So the original readable will pause by default after 16 * 3 chunks, or 48 reads.

    When not in objectMode the highWaterMark for the buffer/string is 16384 bytes (although setEncoding can change the meaning of this).