Search code examples
node.jsasync-awaitstream

Async/Await createReadStream end before read all data : Node.js


I'm using fs.createReadStream to read uploaded csv file data.When data row received,I'm validate those data and push validated data in to the array.

The issue was,it's receive final row after called end.

I've use csv with four rows for for bellow test.All four records are validation pass.

const stream = fs.createReadStream(filePath)
    .pipe(csv.parse({ headers: true }))
    .on("error", (error) => {
        throw error.message;
    })
    .on("data", async (row) => {
        try {
            stream.pause();
            const mobile = Helper.validateMobile(row.Telephone)

            const validationerrors = await Helper.ValidateCustomer(null, mobile, row.code, "BULK");

            console.log(`validationerrors ---> ${validationerrors} Telephone--> ${row.Telephone}`)

            if (validationerrors.length) {
                throw validationerrors.message

            } else {
                csvData.push(row);
            }
        } finally {
            stream.resume();
        }
    })
    .on("end", async () => {
        console.log("ENDDD ---->", csvData)
    });

Console.log out put is like this.Final row (row with 777223478) receive after on("end") calls.

    validationerrors --->  Telephone--> 778786516
    validationerrors --->  Telephone--> 718254596 
    validationerrors --->  Telephone--> 712760763 
    
    ENDDD ----> [ 
       { 
        Code: 'CTLD000323',',   
        Telephone: '778786516' }, 
       { 
        Code: 'CTLD000324',  
        Telephone: '718254596' }, 
       { 
        Code: 'CTLD000376',    
        Telephone: '712760763' } 
      ] 
    
    validationerrors --->  Telephone--> 777223478

Solution

  • So, it appears that the CSV library does not respect the .pause() method and if it has multiple rows queued up, it goes ahead and fires data events for them, even if the stream is paused. The two ways I know of to fix that are to modify the csv module to respect the stream pause or to queue up line as they come in so you can manually pause processing of them when you are already in the middle of processing something. And, you also have to queue up the end event since the stream will send it before you're done processing. I've handled the end event by synthesizing my own finalEnd event. Here's a rough implementation of a queue that buffers data events that arrive while the stream is paused.

    let paused = false;
    const queue = [];
    let end = false;
    
    const stream = fs.createReadStream(filePath)
        .pipe(csv.parse({ headers: true }))
        .on("error", (error) => {
            throw error.message;
        })
        .on("data", async (row) => {
            queue.push(row);
            if (!paused) {
                stream.pause();
                paused = true;
                while (queue.length) {
                    try {
                        await processRow(queue.shift());
                    } catch (e) {
                        // decide what to do here if you get an error processing a row
                        console.log(e);
                    }
                }
                paused = false;
                stream.resume();
                if (end) {
                    stream.emit("finalEnd");
                }
            }
    
            async function processRow(row) {
                const mobile = Helper.validateMobile(row.Telephone)
    
                const validationerrors = await Helper.ValidateCustomer(null, mobile, row.code, "BULK");
    
                console.log(`validationerrors ---> ${validationerrors} Telephone--> ${row.Telephone}`)
    
                if (validationerrors.length) {
                    throw validationerrors.message
    
                } else {
                    csvData.push(row);
                }
            }
    
        })
        .on("end", async () => {
            end = true;
            if (!queue.length && !paused) {
                stream.emit("finalEnd");
            }
        }).on("finalEnd", () => {
            console.log("ENDDD ---->", csvData)
        });