Search code examples
node.jszlibsaxpg

Records are lost when sax inserts into a database in batches


I read a gz file using sax and zlib and then update my db, its seems that i skipped some of my file content.

This is my code:

// variables to demonstrate the weired behovior
let recordsNum = 1;
let recordsNum2 = 1;
let numOfFailed = 0;

async function boo(...args){
  const saxStream = sax.createStream(true); // strict mode
  let currentElement = null;
  let currentObject = null;

    saxStream.on("opentag", (node) => {
    if (node.name === "Item") {
      currentObject = {};
    } else {
      currentElement = keyTransforms[node.name] ;
    }
  });

  saxStream.on("closetag", async (nodeName) => {
    if (nodeName === "Item") {
      // process the current object
      recordsNum++;
      if (arrOfRecoreds.length <= 100 && currentObject) {
        arrOfRecoreds.push(currentObject);
        recordsNum2++;
      }

      if (!currentObject) {
        numOfFailed++;
      }

      if (arrOfRecoreds.length === 100) {
        await insertBatch(arrOfRecoreds);
        arrOfRecoreds = [];
      }

      // reset the current object
      currentObject = null;
    } else {
      currentElement = null;
    }
  });


}

 const readStream = fs
    .createReadStream("./temp/" + fileName)
    .pipe(zlib.createGunzip()) 
    .pipe(saxStream);

await new Promise((resolve, reject) => {
    readStream.on("end", async () => {
      if (arrOfRecoreds.length > 0) {
        await insertBatch(arrOfRecoreds);
        resolve();
      }
    });
    readStream.on("error", reject);
  });


  return new Promise((res) => {
    pool.end();
    console.log("Finished processing file.");

    res({
      statusCode: 201,
      message: "update db",
    });
  });

where insertBatch function is just updateing db according the recoreds:

const client = await pool.connect();
  try {
    await client.query("BEGIN");
    await doSomthing(recoreds)
    await client.query("COMMIT");
    }

    catch(e){
    await client.query("ROLLBACK");
    }finally{
    client.release()
    }

The code works without errors but it contain some bugs i cant understand :

first bug: when invoking the boo:

boo("str", "filename.rar").then(() => {
  console.log("num of recordes ", recordsNum);
  console.log("num of recordes2 ", recordsNum2);
  console.log("num of failed ", numOfFailed);
});

the logs are: num of records 100 num of recordes2 45 num of failed 6

  1. why recordsNum not equal to recordsNum2?
  2. why there are some faild objects? the XML file is valid for sure !!!
  3. the numbers of recordsNum2 and failed are changed each run

these bugs make this script problematic due to it not updating all the records in my postgress db


Solution

  • You have not declared arrOfRecoreds in your code.

    Your closetag handler is asynchronous, but sax does not wait for asynchronous handlers to finish. This means that after you issue the database operation command insertBatch(arrOfRecoreds), sax will continue parsing. Another closetag event will be emitted and new entries pushed into arrOfRecoreds before the database operation finishes and arrOfRecoreds = [] is executed. At that point in time, everything that was pushed into it while the database operation was ongoing is lost.

    This can be avoided if you don't await the database operation and reset arrOfRecoreds immediately after triggering it:

    if (arrOfRecoreds.length === 100) {
      dbOperations.push(insertBatch(arrOfRecoreds));
      arrOfRecoreds = [];
    }
    

    Here, dbOperations is an array that you declare as var dbOperations = []; in the same place where you declare arrOfRecoreds. It contains one promise per insertBatch operation, and you must wait for all these promises to become fulfilled before you can close your database connection at the end of the loop:

    await Promise.all(dbOperations);
    pool.end();