I'm working on my custom transform stream. Since I handle buffering and caching manually, I need to ensure that no chunks will be automatically sent when re-piping it. To achieve that I should clear/reset the internal buffer (the write queue) before attempting to call .pipe()
.
Since my stream logic is quite complicated, I made this simplified example to help you better understand what I'm trying to solve:
const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))
;(async () => {
const readable = new Readable()
readable._read = () => {}
const writable = new Writable()
writable._write = (chunk, _encoding, callback) => {
console.log(chunk.toString())
callback()
}
readable.pipe(writable)
await wait(100) // This small delay replaces listening to the pipe/unpipe events
readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
console.log('unpipe')
readable.unpipe(writable)
await wait(100)
readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
// readable.clearInternalBufferOrWhatever() // ???
console.log('pipe')
readable.pipe(writable)
await wait(100)
readable.push('ccccccccccccccccccccccccccccccc')
})()
// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa (Expected ✔)
// unpipe
// pipe
// bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb (Unexpected ❌)
// ccccccccccccccccccccccccccccccc (Expected ✔)
As you can see, the writable stream has successfully received the second chunk ('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb'
) and that's what I'm trying to avoid. Any ideas?
I figured it out! Streams have an internal property called _readableState
which contains the internal buffer. It's possible to clear it with just 2 lines of code:
readable._readableState.buffer.clear()
readable._readableState.length = 0
Note that _readableState
isn't safe since it is internal and undocumented and could get breaking changes at anytime. However, this solution is confirmed to work on node v8.10.0 and v14.17.2.
Here is a full working example:
const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))
;(async () => {
const readable = new Readable()
readable._read = () => {}
const writable = new Writable()
writable._write = (chunk, _encoding, callback) => {
console.log(chunk.toString())
callback()
}
readable.pipe(writable)
await wait(100)
readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
console.log('unpipe')
readable.unpipe(writable)
await wait(100)
readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
readable._readableState.buffer.clear()
readable._readableState.length = 0
console.log('pipe')
readable.pipe(writable)
await wait(100)
readable.push('ccccccccccccccccccccccccccccccc')
})()
// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
// unpipe
// pipe
// ccccccccccccccccccccccccccccccc