Search code examples
javascriptnode.jsnode-streams

How to properly clear the internal buffer of a readable stream in node?


I'm working on my custom transform stream. Since I handle buffering and caching manually, I need to ensure that no chunks will be automatically sent when re-piping it. To achieve that I should clear/reset the internal buffer (the write queue) before attempting to call .pipe().

Since my stream logic is quite complicated, I made this simplified example to help you better understand what I'm trying to solve:

const { Readable, Writable } = require('stream')
const wait = ms => new Promise(res => setTimeout(res, ms))

;(async () => {
    const readable = new Readable()
    readable._read = () => {}

    const writable = new Writable()
    writable._write = (chunk, _encoding, callback) => {
        console.log(chunk.toString())
        callback()
    }

    readable.pipe(writable)
    await wait(100) // This small delay replaces listening to the pipe/unpipe events 

    readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')

    console.log('unpipe')
    readable.unpipe(writable)
    await wait(100)

    readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')

    // readable.clearInternalBufferOrWhatever() // ???

    console.log('pipe')
    readable.pipe(writable)
    await wait(100)

    readable.push('ccccccccccccccccccccccccccccccc')
})()

// => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa (Expected ✔)
//    unpipe
//    pipe
//    bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb (Unexpected ❌)
//    ccccccccccccccccccccccccccccccc (Expected ✔)

As you can see, the writable stream has successfully received the second chunk ('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb') and that's what I'm trying to avoid. Any ideas?


Solution

  • I figured it out! Streams have an internal property called _readableState which contains the internal buffer. It's possible to clear it with just 2 lines of code:

    readable._readableState.buffer.clear()
    readable._readableState.length = 0
    

    Note that _readableState isn't safe since it is internal and undocumented and could get breaking changes at anytime. However, this solution is confirmed to work on node v8.10.0 and v14.17.2.

    Here is a full working example:

    const { Readable, Writable } = require('stream')
    const wait = ms => new Promise(res => setTimeout(res, ms))
    
    ;(async () => {
        const readable = new Readable()
        readable._read = () => {}
    
        const writable = new Writable()
        writable._write = (chunk, _encoding, callback) => {
            console.log(chunk.toString())
            callback()
        }
    
        readable.pipe(writable)
        await wait(100)
        readable.push('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
    
        console.log('unpipe')
        readable.unpipe(writable)
        await wait(100)
    
        readable.push('bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb')
    
        readable._readableState.buffer.clear()
        readable._readableState.length = 0
    
        console.log('pipe')
        readable.pipe(writable)
        await wait(100)
    
        readable.push('ccccccccccccccccccccccccccccccc')
    })()
    
    // => aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
    //    unpipe
    //    pipe
    //    ccccccccccccccccccccccccccccccc