Search code examples
node.jsfile-uploadbusboy

Nodejs Resumable file upload using busboy


I am implementing an upload endpoint using express and busboy. The code works like a charm. I am trying to implement the logic of resumable uploads. For eg, if a user is uploading a 10gb or 50gb file, and if their internet disconnects, and next time when they are uploading the same file, it should resume.

I do understand that I need to implement another endpoint that should tell the client how many bytes has been uploaded so far, so that the client can send the remaining bytes.

I am not sure how to proceed from here, because the first problem i am facing here is that when the upload happens, express uploads the temp file in the tmp directory of the OS. Is it possible to upload the temp file in my current script directory?

Following is my upload endpoint code.

router.post("/upload", (req, res, next) => {
  const busboy = new Busboy({ headers: req.headers });
  req.pipe(busboy);
  busboy.on("file", (fieldname, file, filename) => {
    const filepath = path.join(__dirname, filename);
    var writeStream = fs.createWriteStream(filepath);

    file.pipe(writeStream);
    writeStream.on("close", () => {
      res.send(filename);
    });
  });
});

Solution

  • Answering my own question.

    I have managed to crack the solution. Basically you need to resume uploading the file from where it was last uploaded. I am not sure if this it the best way of handling but it does the job for me.

    server.js

    const express = require("express");
    const busboy = require("connect-busboy");
    const path = require("path");
    const fs = require("fs");
    const cors = require("cors");
    
    const app = express(); // Initialize the express web server
    app.use(cors());
    app.use(
      busboy({
        highWaterMark: 2 * 1024 * 1024, // Set 2MiB buffer
      })
    ); // Insert the busboy middle-ware
    
    app.use(busboy()); // Insert the busboy middle-ware
    
    const uploadPath = path.join(__dirname, "upload_data");
    
    const database = {};
    
    // This endpoint tells the client how much bytes have already been uploaded so far, otherwise sends 0 byte (meaning file is new)
    app.route("/:id").get((req, res, next) => {
      const fileId = req.params.id;
      let bytes = 0;
    
      const dbFileName = database[fileId];
      if (dbFileName) {
        try {
          const completeFilePath = path.join(uploadPath, dbFileName);
          const fd = fs.openSync(completeFilePath, "r");
          const fileStat = fs.fstatSync(fd);
          bytes = fileStat.size;
          return res.json({ bytes: bytes });
        } catch (error) {
          console.error(error);
          return res.json({ bytes: bytes });
        }
      }
      return res.json({ bytes: bytes });
    });
    
    
    
    // Handle the upload post request
    app.route("/upload").post((req, res, next) => {
      const xFileId = req.headers["x-file-id"];
      const xStartByte = parseInt(req.headers["x-start-byte"], 10);
      const xFileSize = parseInt(req.headers["x-file-size"], 10);
    
      if (xStartByte >= xFileSize) {
        return res.json("File already uploaded");
      }
    
      req.pipe(req.busboy); // Pipe it trough busboy
    
      req.on("data", (data) => {
        // console.log(">", data.length);
      });
    
      req.busboy.on("file", (fieldname, file, filename) => {
        if (database[xFileId]) {
          filename = database[xFileId];
        } else {
          database[xFileId] = filename;
        }
    
        const completeFilePath = path.join(uploadPath, filename);
        console.log(`Upload of '${filename}' started`);
    
        // Create a write stream of the new file
        let fstream;
        if (xStartByte) {
          console.log("APPEND Mode");
          fstream = fs.createWriteStream(completeFilePath, {
            flags: "a",
          });
        } else {
          console.log("WRITE Mode");
          fstream = fs.createWriteStream(completeFilePath, {
            flags: "w",
          });
        }
    
        // Pipe it trough
        file.pipe(fstream);
    
        file.on("error", (e) => console.log("file.on.error", e));
    
        file.on("limit", (e) => console.log("Limit reached", e));
    
        fstream.on("error", function (err) {
          console.log("fileStream error>>>>>", err);
        });
    
        // On finish of the upload
        fstream.on("close", () => {
          console.log(`Upload of '${filename}' finished`);
          // res.json('done');
        });
      });
    
      req.busboy.on("finish", function (a) {
        return res.json("ok");
      });
    
      req.busboy.on("error", (err) => {
        console.log(`Busboy error`, err);
      });
    });
     
    
    app.listen(6969, () => console.log("listing on 6969"));
    

    client.js

    var request = require("request");
    var fs = require("fs");
    var path = require("path");
    
    let filebasename = "35gb.zip";
    const filePath = path.join(__dirname, filebasename);
    
    
    // Get the information about the file, like filesize and unique id of the file 
    function getFileInfo() {
      try {
        const fd = fs.openSync(filePath, "r");
        const fileStat = fs.fstatSync(fd);
        return {
          fileId: `${filebasename}-${fileStat.size}-${fileStat.mtimeMs}`,
          size: fileStat.size,
        };
      } catch (error) {
        console.error(error);
      }
    }
    
    const { fileId, size } = getFileInfo();
    
    // Send api request to server asking how much bytes have already been uploaded (if any)
    function info() {
      let url = `http://localhost:6969/${fileId}`;
    
      const options = {
        method: "GET",
        url,
        timeout: 200000,
        headers: {
          "Content-Type": "application/json",
        },
      };
    
      return new Promise((resolve, reject) => {
        request(options, function (err, res, body) {
          if (err) {
            console.log(err);
            return reject(err);
          }
    
          const { bytes } = JSON.parse(body);
          resolve(bytes);
        });
      });
    }
    
     
    // Send upload request
    async function upload() {
      const bytesAlreadyUploaded = await info();
      let url = "http://localhost:6969/upload";
      const uploadStream = fs.createReadStream(filePath, {
        start: bytesAlreadyUploaded, // this will be 0 incase file is new
        highWaterMark: 2 * 1024 * 1024,
      });
    
      const options = {
        method: "POST",
        url,
        timeout: 200000,
        headers: {
          "Content-Type": "multipart/form-data",
          "x-file-id": fileId,
          "x-start-byte": bytesAlreadyUploaded,
          "x-file-size": size,
          "x-file-name": filebasename,
        },
        formData: {
          image: uploadStream,
        },
      };
    
      request(options, async (err, res, body) => {
        if (err) {
          // Basically if an error occurs, EPIPE or Connection timed out or any other error, we will resume uploading from the point where it was last uploaded
          console.log(`Error ${err.code}. Resuming upload...`);
          await upload();
          return;
        }
        console.log("body", typeof body, body);
      });
    }
    
    (async () => {
      upload();
    })();