Search code examples
node.jsamazon-s3etlnode-modulesnode-streams

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close in Node Pipeline stream


I am using the stream.pipeline functionality from Node to upload some data to S3. The basic idea I'm implementing is pulling files from a request and writing them to S3. I have one pipeline that pulls zip files and writes them to S3 successfully. However, I want my second pipeline to make the same request, but unzip and write the unzipped files to S3. The pipeline code looks like the following:

pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))

The s3Stream function looks like so:

function s3Stream(file) {
    const pass = new stream.PassThrough()
    s3Store.upload(file, pass)
    return pass
}

The first pipeline works well, and is currently operating greatly in production. However, when adding the second pipeline, I get the following error:

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19)

Any idea what could be causing this or solutions to resolve this would be greatly appreciated!


Solution

  • TL;DR

    When using a pipeline you accept to consume the readable stream fully, you don't want anything stopping before the readable ends.

    Deep dive

    After some time working with those shenanigans here is some more usefull informations.

    import stream from 'stream'
    
    const s1 = new stream.PassThrough()
    const s2 = new stream.PassThrough()
    const s3 = new stream.PassThrough()
    
    s1.on('end', () => console.log('end 1'))
    s2.on('end', () => console.log('end 2'))
    s3.on('end', () => console.log('end 3'))
    s1.on('close', () => console.log('close 1'))
    s2.on('close', () => console.log('close 2'))
    s3.on('close', () => console.log('close 3'))
    
    stream.pipeline(
        s1,
        s2,
        s3,
        async s => { for await (_ of s) { } },
        err => console.log('end', err)
    )
    

    now if i call s2.end() it will close all parents

    end 2
    close 2
    end 3
    close 3
    

    pipeline is the equivalent of s3(s2(s1)))

    but if i call s2.destroy() it print and destroy everything, this is your problem here a stream is destroyed before it ends normally, either an error or a return/break/throws in an asyncGenerator/asyncFunction

    close 2
    end Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
        at PassThrough.onclose (internal/streams/end-of-stream.js:117:38)
        at PassThrough.emit (events.js:327:22)
        at emitCloseNT (internal/streams/destroy.js:81:10)
        at processTicksAndRejections (internal/process/task_queues.js:83:21) {
      code: 'ERR_STREAM_PREMATURE_CLOSE'
    }
    close 1
    close 3
    

    You must not let one of the streams without a way to catch their errors

    stream.pipeline() leaves dangling event listeners on the streams after theallback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.

    node source (14.4)

      const onclose = () => {
        if (readable && !readableEnded) {
          if (!isReadableEnded(stream))
            return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
        }
        if (writable && !writableFinished) {
          if (!isWritableFinished(stream))
            return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
        }
        callback.call(stream);
      };