Search code examples
javascriptnode.jsstream

Is `stream` reading a consuming process in NodeJs?


A stream is an abstract interface for working with streaming data in Node.js. The node:stream module provides an API for implementing the stream interface.

When I was learning about NodeJs streams, I ran into some issues where I wasn't sure if a stream was a consuming process or not

Such as, I hava a public code:

import { Readable } from 'stream'

class MyCustomReadableStream extends Readable {
  constructor() {
    super();
    this.data = ['data1', 'data2', 'data3'];
  }

  _read() {
    const chunk = this.data.shift();

    if (chunk) {
      this.push(chunk);
    } else {
      this.push(null);
    }
  }
}

And There are some different situation with different code about deal with stream


Situation 1:

import { createServer } from "http";

const customStream = new MyCustomReadableStream();

customStream.on('data', (chunk) => {
  console.log('Received chunk of data:', chunk.toString());
});

customStream.on('end', () => {
  console.log('Custom stream reading is complete.');
});


createServer().listen(0)

/*
// Output: 
Received chunk of data: data1
Received chunk of data: data2
Received chunk of data: data3
Custom stream reading is complete.
*/

Situation 2:

import { createServer } from "http";

const customStream = new MyCustomReadableStream();

setTimeout(() => {
    customStream.on('data', (chunk) => {
        console.log('1Received chunk of data:', chunk.toString());
    });
}, 1000)

customStream.on('end', () => {
  console.log('Custom stream reading is complete.');
});


createServer().listen(0)

/*
// output:
1Received chunk of data: data1
1Received chunk of data: data2
1Received chunk of data: data3
Custom stream reading is complete.
*/

Situation 3:

import { createServer } from "http";

const customStream = new MyCustomReadableStream();

customStream.on('data', (chunk) => {
  console.log('Received chunk of data:', chunk.toString());
});

setTimeout(() => {
    customStream.on('data', (chunk) => {
        console.log('1Received chunk of data:', chunk.toString());
    });
}, 1000)

customStream.on('end', () => {
  console.log('Custom stream reading is complete.');
});


createServer().listen(0)

/*
// output:
Received chunk of data: data1
Received chunk of data: data2
Received chunk of data: data3
Custom stream reading is complete.
*/

Situation 4:

import { createServer } from "http";

const customStream = new MyCustomReadableStream();

customStream.on('data', (chunk) => {
  console.log('Received chunk of data:', chunk.toString());
});

customStream.on('data', (chunk) => {
    console.log('1Received chunk of data:', chunk.toString());
});

customStream.on('end', () => {
  console.log('Custom stream reading is complete.');
});


createServer().listen(0)

/*
// output:
Received chunk of data: data1
1Received chunk of data: data1
Received chunk of data: data2
1Received chunk of data: data2
Received chunk of data: data3
1Received chunk of data: data3
Custom stream reading is complete.
Custom stream reading is complete.
*/

I can't understand the difference of these four forms, Is stream reading a consuming process?


Solution

  • stream.Readable extends the events.EventEmitter class. When this.emit('data', chunk) is called, the Event Emitter will look for the subscribed event listeners (source code link). Then it will execute those callbacks with the given arguments. Looking for the callbacks and calling them happens synchronously.

    This means that the Readable won't wait for the other data event subscribers. It will immediately call its resume method as soon as a listener subscribes to the data event (more at: Readable streams -> Two reading modes). Later subscriptions won't trigger a read from the beginning; they will receive later chunks but not the ones they 'missed' by subscribing late.