Search code examples
javascriptnode.jsamazon-s3aws-sdkkoa

What is the correct way upload large koa request body to AWS S3?


I'm building a application backend. The clients post file as request body to server, then server uploads the file to AWS S3. Server is using NodeJS and koa web framework.

If I use the raw-body to get the post body to buffer, when the file is large, the buffer is large, and cause a out of memory error.
If I directly pass the ctx.req (a IncomingMessage object) to S3.putObject, AWS SDK throws a error says Cannot determine length of [object Object], looks like AWS SDK try to get the length of stream then start multi part upload.

AWS SDK version 2.383.0 (currently lastest)
NodeJS 10.14.2

At this time, I wrote a function that read from IncomingMessage as a stream, wait for data event to fill up a large buffer (16MB) then do the multi part upload to S3, this solves the problem well, but I'm still looking for better solution.


Solution

  • After months running, I think my final solution is stable and reliable.

    The main concept is receive from IncomingMessage stream store to a buffer, after the buffer reach a size, then put current part to S3, then continue to read the stream until end.

    const uploaderLogger = Log4js.getLogger('customUploader');
    function customMultiPartUpload(s3, bucket, key, incomingMessage, partSizeInByte) {
        return new Promise((resolve) => {
            partSizeInByte = partSizeInByte || uploadBufferMB * 1024 * 1024;
            uploaderLogger.debug(`part size is ${partSizeInByte}`);
    
            let uploadId = null;
            let partNumber = 0;
            let parts = [];
            let fileSize = 0;
            let reserveBuffer = Buffer.alloc(0);
            const sendBuffer = Buffer.alloc(partSizeInByte);
            const md5Hash = Crypto.createHash('md5');
    
            const doUpload = async (uploadBuffer) => {
                if (!uploadId) {
                    uploaderLogger.debug('multipart upload not initialized');
                    const createData = await s3.createMultipartUpload({
                        Bucket: bucket,
                        Key: key
                    }).promise();
                    uploadId = createData.UploadId;
                    uploaderLogger.debug(`uploadId ${uploadId}`);
    
                    partNumber = 0;
                }
                fileSize += uploadBuffer.length;
                uploaderLogger.debug(`buffer length ${uploadBuffer.length}, total ${fileSize}`);
    
                partNumber += 1;
                uploaderLogger.debug(`part number ${partNumber}`);
    
                md5Hash.update(uploadBuffer);
    
                const partData = await s3.uploadPart({
                    Bucket: bucket,
                    Key: key,
                    PartNumber: partNumber,
                    UploadId: uploadId,
                    Body: uploadBuffer
                }).promise();
                parts.push({
                    PartNumber: partNumber,
                    ETag: partData.ETag
                });
                uploaderLogger.debug(`etag ${partData.ETag}`);
            };
    
            incomingMessage.on('data', async (chunkBuffer) => {
                incomingMessage.pause();
    
                reserveBuffer = Buffer.concat([ reserveBuffer, chunkBuffer ]);
                if (reserveBuffer.length > partSizeInByte) {
                    do {
                        reserveBuffer.copy(sendBuffer, 0, 0, partSizeInByte);
                        reserveBuffer = reserveBuffer.slice(partSizeInByte);
                        await doUpload(sendBuffer);
                    } while (reserveBuffer.length > partSizeInByte);
                }
    
                incomingMessage.resume();
            });
    
            incomingMessage.on('end', async () => {
                uploaderLogger.debug('stream end');
    
                if (reserveBuffer.length > 0) {
                    await doUpload(reserveBuffer);
                }
    
                if (uploadId) {
                    uploaderLogger.debug('uploadId not null');
                    await s3.completeMultipartUpload({
                        Bucket: bucket,
                        Key: key,
                        UploadId: uploadId,
                        MultipartUpload: {
                            Parts: parts
                        }
                    }).promise();
                    uploaderLogger.debug('multipart upload complete');
                }
    
                const hash = md5Hash.digest('hex');
    
                resolve({
                    size: fileSize,
                    hash: hash
                });
                uploaderLogger.debug(`return file size ${fileSize}, hash ${hash}`);
            });
        });
    }
    

    adjust partSizeInByte to fit your server memory usage, too large part size may cause OOM when server is handling many requests, too small part size may less than S3 part limitation.