A stream
is an abstract interface for working with streaming data in Node.js. The node:stream module provides an API for implementing the stream interface.
When I was learning about NodeJs streams
, I ran into some issues where I wasn't sure if a stream was a consuming process or not
Such as, I hava a public code:
import { Readable } from 'stream'
class MyCustomReadableStream extends Readable {
constructor() {
super();
this.data = ['data1', 'data2', 'data3'];
}
_read() {
const chunk = this.data.shift();
if (chunk) {
this.push(chunk);
} else {
this.push(null);
}
}
}
And There are some different situation with different code about deal with stream
Situation 1:
import { createServer } from "http";
const customStream = new MyCustomReadableStream();
customStream.on('data', (chunk) => {
console.log('Received chunk of data:', chunk.toString());
});
customStream.on('end', () => {
console.log('Custom stream reading is complete.');
});
createServer().listen(0)
/*
// Output:
Received chunk of data: data1
Received chunk of data: data2
Received chunk of data: data3
Custom stream reading is complete.
*/
Situation 2:
import { createServer } from "http";
const customStream = new MyCustomReadableStream();
setTimeout(() => {
customStream.on('data', (chunk) => {
console.log('1Received chunk of data:', chunk.toString());
});
}, 1000)
customStream.on('end', () => {
console.log('Custom stream reading is complete.');
});
createServer().listen(0)
/*
// output:
1Received chunk of data: data1
1Received chunk of data: data2
1Received chunk of data: data3
Custom stream reading is complete.
*/
Situation 3:
import { createServer } from "http";
const customStream = new MyCustomReadableStream();
customStream.on('data', (chunk) => {
console.log('Received chunk of data:', chunk.toString());
});
setTimeout(() => {
customStream.on('data', (chunk) => {
console.log('1Received chunk of data:', chunk.toString());
});
}, 1000)
customStream.on('end', () => {
console.log('Custom stream reading is complete.');
});
createServer().listen(0)
/*
// output:
Received chunk of data: data1
Received chunk of data: data2
Received chunk of data: data3
Custom stream reading is complete.
*/
Situation 4:
import { createServer } from "http";
const customStream = new MyCustomReadableStream();
customStream.on('data', (chunk) => {
console.log('Received chunk of data:', chunk.toString());
});
customStream.on('data', (chunk) => {
console.log('1Received chunk of data:', chunk.toString());
});
customStream.on('end', () => {
console.log('Custom stream reading is complete.');
});
createServer().listen(0)
/*
// output:
Received chunk of data: data1
1Received chunk of data: data1
Received chunk of data: data2
1Received chunk of data: data2
Received chunk of data: data3
1Received chunk of data: data3
Custom stream reading is complete.
Custom stream reading is complete.
*/
I can't understand the difference of these four forms, Is stream reading a consuming process?
stream.Readable
extends the events.EventEmitter
class. When this.emit('data', chunk)
is called, the Event Emitter will look for the subscribed event listeners (source code link). Then it will execute those callbacks with the given arguments. Looking for the callbacks and calling them happens synchronously.
This means that the Readable
won't wait for the other data
event subscribers. It will immediately call its resume
method as soon as a listener subscribes to the data
event (more at: Readable streams -> Two reading modes). Later subscriptions won't trigger a read from the beginning; they will receive later chunks but not the ones they 'missed' by subscribing late.