I am implementing an upload endpoint using express and busboy. The code works like a charm. I am trying to implement the logic of resumable uploads. For eg, if a user is uploading a 10gb or 50gb file, and if their internet disconnects, and next time when they are uploading the same file, it should resume.
I do understand that I need to implement another endpoint that should tell the client how many bytes has been uploaded so far, so that the client can send the remaining bytes.
I am not sure how to proceed from here, because the first problem i am facing here is that when the upload happens, express uploads the temp file in the tmp directory of the OS. Is it possible to upload the temp file in my current script directory?
Following is my upload endpoint code.
router.post("/upload", (req, res, next) => {
const busboy = new Busboy({ headers: req.headers });
req.pipe(busboy);
busboy.on("file", (fieldname, file, filename) => {
const filepath = path.join(__dirname, filename);
var writeStream = fs.createWriteStream(filepath);
file.pipe(writeStream);
writeStream.on("close", () => {
res.send(filename);
});
});
});
Answering my own question.
I have managed to crack the solution. Basically you need to resume uploading the file from where it was last uploaded. I am not sure if this it the best way of handling but it does the job for me.
server.js
const express = require("express");
const busboy = require("connect-busboy");
const path = require("path");
const fs = require("fs");
const cors = require("cors");
const app = express(); // Initialize the express web server
app.use(cors());
app.use(
busboy({
highWaterMark: 2 * 1024 * 1024, // Set 2MiB buffer
})
); // Insert the busboy middle-ware
app.use(busboy()); // Insert the busboy middle-ware
const uploadPath = path.join(__dirname, "upload_data");
const database = {};
// This endpoint tells the client how much bytes have already been uploaded so far, otherwise sends 0 byte (meaning file is new)
app.route("/:id").get((req, res, next) => {
const fileId = req.params.id;
let bytes = 0;
const dbFileName = database[fileId];
if (dbFileName) {
try {
const completeFilePath = path.join(uploadPath, dbFileName);
const fd = fs.openSync(completeFilePath, "r");
const fileStat = fs.fstatSync(fd);
bytes = fileStat.size;
return res.json({ bytes: bytes });
} catch (error) {
console.error(error);
return res.json({ bytes: bytes });
}
}
return res.json({ bytes: bytes });
});
// Handle the upload post request
app.route("/upload").post((req, res, next) => {
const xFileId = req.headers["x-file-id"];
const xStartByte = parseInt(req.headers["x-start-byte"], 10);
const xFileSize = parseInt(req.headers["x-file-size"], 10);
if (xStartByte >= xFileSize) {
return res.json("File already uploaded");
}
req.pipe(req.busboy); // Pipe it trough busboy
req.on("data", (data) => {
// console.log(">", data.length);
});
req.busboy.on("file", (fieldname, file, filename) => {
if (database[xFileId]) {
filename = database[xFileId];
} else {
database[xFileId] = filename;
}
const completeFilePath = path.join(uploadPath, filename);
console.log(`Upload of '${filename}' started`);
// Create a write stream of the new file
let fstream;
if (xStartByte) {
console.log("APPEND Mode");
fstream = fs.createWriteStream(completeFilePath, {
flags: "a",
});
} else {
console.log("WRITE Mode");
fstream = fs.createWriteStream(completeFilePath, {
flags: "w",
});
}
// Pipe it trough
file.pipe(fstream);
file.on("error", (e) => console.log("file.on.error", e));
file.on("limit", (e) => console.log("Limit reached", e));
fstream.on("error", function (err) {
console.log("fileStream error>>>>>", err);
});
// On finish of the upload
fstream.on("close", () => {
console.log(`Upload of '${filename}' finished`);
// res.json('done');
});
});
req.busboy.on("finish", function (a) {
return res.json("ok");
});
req.busboy.on("error", (err) => {
console.log(`Busboy error`, err);
});
});
app.listen(6969, () => console.log("listing on 6969"));
client.js
var request = require("request");
var fs = require("fs");
var path = require("path");
let filebasename = "35gb.zip";
const filePath = path.join(__dirname, filebasename);
// Get the information about the file, like filesize and unique id of the file
function getFileInfo() {
try {
const fd = fs.openSync(filePath, "r");
const fileStat = fs.fstatSync(fd);
return {
fileId: `${filebasename}-${fileStat.size}-${fileStat.mtimeMs}`,
size: fileStat.size,
};
} catch (error) {
console.error(error);
}
}
const { fileId, size } = getFileInfo();
// Send api request to server asking how much bytes have already been uploaded (if any)
function info() {
let url = `http://localhost:6969/${fileId}`;
const options = {
method: "GET",
url,
timeout: 200000,
headers: {
"Content-Type": "application/json",
},
};
return new Promise((resolve, reject) => {
request(options, function (err, res, body) {
if (err) {
console.log(err);
return reject(err);
}
const { bytes } = JSON.parse(body);
resolve(bytes);
});
});
}
// Send upload request
async function upload() {
const bytesAlreadyUploaded = await info();
let url = "http://localhost:6969/upload";
const uploadStream = fs.createReadStream(filePath, {
start: bytesAlreadyUploaded, // this will be 0 incase file is new
highWaterMark: 2 * 1024 * 1024,
});
const options = {
method: "POST",
url,
timeout: 200000,
headers: {
"Content-Type": "multipart/form-data",
"x-file-id": fileId,
"x-start-byte": bytesAlreadyUploaded,
"x-file-size": size,
"x-file-name": filebasename,
},
formData: {
image: uploadStream,
},
};
request(options, async (err, res, body) => {
if (err) {
// Basically if an error occurs, EPIPE or Connection timed out or any other error, we will resume uploading from the point where it was last uploaded
console.log(`Error ${err.code}. Resuming upload...`);
await upload();
return;
}
console.log("body", typeof body, body);
});
}
(async () => {
upload();
})();