Search code examples
javascriptnode.jspipeline

How do I construct a fie from Node Js pipeline() function from file-chunks in a loop?


So I am trying to combine a file from chunks I got earlier that are in the temp folder, and I am using pipeline for that, but here is what is happening, the loop works well for the first iteration, but doesn't execute for the rest of the iteration.

Here is the codes

import { pipeline } from "stream/promises";
import { promisify } from "util";
import fs from "fs";
import path from "path";


//// Document Buffer construction function
async function combineChunks(
  tempFolder,
  numberedFiles,
  outputPath,
  combinedFileSize
) {

/*
the tempFolder is a string arg of where the chunks are
the numberedFiles is a string array of all the chunks sorted in order
the outputPath is where the constructed file is supposed to be saved
the combinedFileSize is the expected file size
*/
  const writeStream = fs.createWriteStream(outputPath, { flags: "a" }); // Append mode

  try {
    for (const file of numberedFiles) {
      const filePath = path.join(tempFolder, file);

      // Validate file path to prevent directory traversal attacks
      if (!filePath.startsWith(tempFolder)) {
        throw new Error("Invalid file path detected"); // Reject file path if it's not within the expected directory
      }

      // Read the file chunk and write it to the output file
      await pipeline(
        fs.createReadStream(filePath), // Read stream for the chunk file
        writeStream // Write stream for the output file
      );

      // Delete the chunk file after writing
      await fs.promises.unlink(filePath);
    }

    // Close the write stream after all chunks are written
    writeStream.end();

    // Validate the combined file (file size and file type)
    const stats = fs.statSync(outputPath);
    const fileSize = stats.size;
    const allowedDocumentExtensions =
      /\.(jpg|jpeg|png|gif|bmp|svg|webp|txt|doc|docx|odt|xls|xlsx|ods|ppt|pptx|odp|pdf)$/i;

    if (fileSize !== combinedFileSize) {
      // Delete the file in the server
      await fs.promises.unlink(outputPath);
      throw new Error("File size mismatch");
    }

    if (!allowedDocumentExtensions.test(outputPath)) {
      // Delete the file in the server
      await fs.promises.unlink(outputPath);
      throw new Error("Inappropriate file type detected");
    }

    return true;
  } catch (error) {
    console.error("Error combining files:", error);
    return false;
  }
} 

Now as I explained earlier that the first iteration works pretty well, in fact if there is only one chunk (i.e the numberedFiles has only one string) the document is constructed as expected, the problem rises where there are plenty of chunks, and more interesting is that it doesn't throw and error on the console, so its gets a little hard to debug I tried searching as much as I can, cant seem to find the solution of what exactly is going wrong. I will appreciate any help provided!

I tried to search for the solution all over across the internet, cant, I didn't get any good solutions.


Solution

  •      await pipeline(
           fs.createReadStream(filePath), // Read stream for the chunk file
           writeStream // Write stream for the output file
         );
    

    This will close the stream but you can pass option to prevent it like this

    for (...) {
      // ...
      await pipeline(readStream, writeStream, { end: false })
      // ...
    }
    
    // writeStream.close()
    
    // EDIT: It appears that .close() is old way of doing and causes
    // problem in some cases, nodejs team has plans to deprecate it in future. 
    // .end() or .destroy() should be used instead.
    writeStream.end()
    

    But I would prefer using writeFile instead of pipeline because its suits perfectly for this scenario.

      const writeFD = await fs.promises.open(outputPath, 'a+') // opens a file and returns it's descriptor
    
      for (const srcFile of srcFiles) {
        const rs = fs.createReadStream(filePath)
        await fs.promises.writeFile(writeFD, rs) // write to the descriptor 
      }
    
      await writeFD.close() // close the descriptor