Search code examples
node.jsmongodbmultithreadingalgorithmserver

Performing calculation over huge dataset in mongoDB with nodejs express.js in backend code | Optimising time and memory


I have to create connections over 700,000 flights data. This is the schema of flights

const flightSchema = new mongoose.Schema(
  {
    date: {
      type: Date,
    },
    flightName: {
      type: String,
    },
    departureStn: {
      type: String,
    },
    scheduleTimeofDeparture: { //using std for it
      type: String,
    },
    blockTime: { 
      type: String,
    },
    scheduleTimeofArrival: {
      type: String,
    },
    arrivalStation: { //using arrStn
      type: String,
    },
beyondODs: [
      {
        type: mongoose.Schema.Types.ObjectId,
        ref: 'FLIGHT',
      },
    ],
    behindODs: [
      {
        type: mongoose.Schema.Types.ObjectId,
        ref: 'FLIGHT',
      },
    ]

This flight data is created for each day based on simple form having flight name, fromDate, toDate, DaysOfWeek For example - AA 323, fromDate as 01 December 2024 to toDate as 31 December 2024 with DaysofWeek as Monday, Tuesday, Wednesday will create flights with 14 entries in flights collections.

Now i have to create connections to find previous or nextFlight for EACH flight in one go and store its ID in beyondODs, behindODs array in each flight. Since, I have to support 700,000 flights data. This becomes very complex.

I used brute-force and this is working for smaller dataset but crashing server or causing memory overload or taking very large time (4-5 hrs) to finish all connections.

This is how i am doing

function createConnections(req, res) {
 const userId = req.user.id;
 const stationsMap = {};

// stations contains time range to connect.. like SIN allow 2hrs for dom and 7hrs for intl flights. these values are stations specific so i need to prefetch this

        const stations = await Stations.find({ userId: userId });
        for (const station of stations) {
            stationsMap[station.stationName] = station;
        }
 const allFlights = await Flights.find({ userId: userId });
 await Flights.updateMany(
            { userId: userId }, // Match all flights for the specific user
            { $unset: { beyondODs: [], behindODs: [] } } // Remove the existing ids
        );

// looping over all database
for (const flight of allFlights) {
            const stationArr = stationsMap[flight.arrStn]; 
            const stationDep = stationsMap[flight.depStn];

            if (!stationArr) {
                continue; // Skip to the next flight
            }

            const stdHTZ = convertTimeToTZ(flight.std, stationDep.stdtz, stationArr.stdtz);
            // const staHTZ = convertTimeToTZ(flight.sta, stationArr.stdtz, hometimeZone);


            // domQuery for finding domestic next flight
            const domQuery = {
                depStn: flight.arrStn,
                arrStn: { $ne: flight.depStn }, // to prevent re routing
                domIntl: { $regex: new RegExp('dom', 'i') }
            };

            const intlQuery = {
                depStn: flight.arrStn,
                arrStn: { $ne: flight.depStn }, 
                domIntl: { $regex: new RegExp('intl', 'i') }
            };

// some extra code to calculate if flight exist on same day or next day 
//...

 if (sameDayInt) {
                    intlQuery.std = { $gte: inInMinStdLT, $lte: inInMaxStdLT };
                    intlQuery.date = new Date(flight.date);
 } else if (nextDayInt) {
                    intlQuery.std = { $gte: inInMinStdLT, $lte: inInMaxStdLT };
                    intlQuery.date = new Date(addDays(flight.date, 1));
                } else if (partialDayInt) {
  }
  else if (partialDayInt) { 
  intlQuery.$or = [
                        {
                            std: { $gte: inInMinStdLT, $lte: "23:59" },
                            date: { $gte: flightDateUTC, $lt: nextDayDateUTC }
                        },
                        {
                            std: { $gte: "00:00", $lte: ininmaxMinusB },
                            date: { $gte: nextDayDateUTC, $lt: addDays(nextDayDateUTC, 1) }
                        }
                    ];
}

and at last getting flights and updating beyondODs and behindODs data

const domFlights = await Flights.find(domQuery);
            const intlFlights = await Flights.find(intlQuery);

            const update = {
                $set: {
                    beyondODs: [...domFlights.map(f => f._id), ...intlFlights.map(f => f._id)],
                },
            };

            await Flights.updateOne({ _id: flight._id }, update);

            // Update behindODs field in domFlights and intlFlights
            if (!flight._id) {
                console.error('Flight _id is undefined or null');
                // Handle the error accordingly, for example, by skipping this update operation
            } else {
                // Update documents with $addToSet only if flight._id is valid
                for (const f of domFlights) {
                    await Flights.updateOne({ _id: f._id }, { $addToSet: { behindODs: flight._id } });
                }
                for (const f of intlFlights) {
                    await Flights.updateOne({ _id: f._id }, { $addToSet: { behindODs: flight._id } });
                }
            }

Please help me how should i complete this in reasonable time. I am not so experienced in solving such large problems, any kind of help or suggestion is appreciated.

I searched chatgpt and it recommends batch processing, bulk writing. It is preventing the crashes and memory heap limit but not reducing time (3 hrs).

Some llms like meta recommends cluster based multithreading, bull package.


Solution

  • If flight A arrives before flight B departs from the same airport then they are 'connected'.

    If flights X and Y do not use any of the same airports for departures and arrivals then they cannot be connected in any way.

    LOOP A over every airport
      Read into memory every arrival at A ( one query to DB )
      Read into memory every departure at A ( one query to DB )
            LOOP R over arrivals
               LOOP D over departures
                  IF D later than R
                      Save into memory connection between R and D
               ENDLOOP
            ENDLOOP
       Save to DataBase all connections at A ( one query to DB )
    ENDLOOP
    

    The two inner loops will run extremely quickly. The total time will depend only on

    3 * database query time * number of airports
    

    Move your database server onto the same machine or, better yet, move to SQLite.

    Below is some C++ code that runs the above algorithm using the SQLite database engine. It finds and stores the connections between 700,000 random flights between 1,000 airports in 150 seconds ( under 3 minutes )

    #include <string>
    #include <fstream>
    #include <sstream>
    #include <iostream>
    #include <vector>
    #include <algorithm>
    
    #include "sqliteClass.h"  https://github.com/JamesBremner/sqliteClass
    
    #include "cRunWatch.h"
    
    raven::sqliteClass db;
    
    struct sFlight
    {
        int name;
        int arr;
        int arrtime;
        int dep;
        int deptime;
    };
    
    void createDB()
    {
        db.open("flights.dat");
        db.exec("PRAGMA synchronous=OFF");
        db.exec(
            "CREATE TABLE IF NOT EXISTS flights "
            " ( name, dep, dep_time, arr, arr_time );");
        db.exec(
            "CREATE TABLE IF NOT EXISTS conx "
            " ( arr, dep );");
    }
    
    void gen(int flightCount, int airportCount)
    {
        db.exec(
            "DELETE FROM flights;");
        auto stmt = db.prepare(
            "INSERT INTO flights "
            " VALUES ( ?1, ?2, ?3, ?4, ?5 );");
        for (int k = 0; k < flightCount; k += 100)
        {
            std::cout << k << " ";
    
            db.exec("BEGIN TRANSACTION");
            for (int k2 = 0; k2 < 100; k2++)
            {
                int dep = rand() % airportCount;
                int arr;
                while (1)
                {
                    arr = rand() % airportCount;
                    if (arr != dep)
                        break;
                }
                int deptime = rand() % 2000;
                int arrtime;
                while (1)
                {
                    arrtime = rand() % 2400;
                    if (arrtime > deptime)
                        break;
                }
    
                db.bind(stmt, 1, k+k2);
                db.bind(stmt, 2, dep);
                db.bind(stmt, 3, deptime);
                db.bind(stmt, 4, arr);
                db.bind(stmt, 5, arrtime);
                db.exec(stmt);
            }
            db.exec("END TRANSACTION");
        }
    }
    
    void readArrivals(
        std::vector<sFlight> &vArrivals,
        int airport,
        raven::sqliteClassStmt *stmt)
    {
        raven::set::cRunWatch aWatcher("readArrivals");
        db.bind(stmt, 1, airport);
        db.exec(stmt,
                [&](raven::sqliteClassStmt &stmt) -> bool
                {
                    sFlight F;
                    F.name = stmt.column_int(0);
                    F.dep = stmt.column_int(1);
                    F.deptime = stmt.column_int(2);
                    F.arr = stmt.column_int(3);
                    F.arrtime = stmt.column_int(4);
                    vArrivals.push_back(F);
                    return true;
                });
    }
    void readDeps(
        std::vector<sFlight> &vDeps,
        int airport,
        raven::sqliteClassStmt *stmt)
    {
        raven::set::cRunWatch aWatcher("readDeps");
        db.bind(stmt, 1, airport);
        db.exec(stmt,
                [&](raven::sqliteClassStmt &stmt) -> bool
                {
                    sFlight F;
                    F.name = stmt.column_int(0);
                    F.dep = stmt.column_int(1);
                    F.deptime = stmt.column_int(2);
                    F.arr = stmt.column_int(3);
                    F.arrtime = stmt.column_int(4);
                    vDeps.push_back(F);
                    return true;
                });
    }
    
    void findConnections(int airportCount)
    {
        // start timing
        raven::set::cRunWatch aWatcher("findConnections");
    
        /*
        LOOP A over every airport
            Read into memory every arrival at A ( one query to DB )
            Read into memory every departure at A ( one query to DB )
            LOOP R over arrivals
               LOOP D over departures
                  IF D later than R
                      Save into memory connection between R and D
               ENDLOOP
            ENDLOOP
       Save to DataBase all connections at A ( one query to DB )
       ENDLOOP
    
    */
    
        auto stmtArrs = db.prepare(
            "SELECT * FROM flights "
            "WHERE arr = ?1;");
        auto stmtDeps = db.prepare(
            "SELECT * FROM flights "
            "WHERE dep = ?1;");
        auto stmtConx = db.prepare(
            "INSERT INTO conx "
            "VALUES ( ?1, ?2 );");
        db.exec(
            "DELETE FROM conx;");
    
        // loop over airports
        for (int ka = 0; ka < airportCount; ka++)
        {
            std::vector<sFlight> vArrivals, vDeps;
            std::vector<std::pair<sFlight, sFlight>> vConnects;
    
            std::cout << "Airport " << ka << " ";
    
            // read arrivals at airport
            readArrivals(vArrivals, ka, stmtArrs);
    
            // read departures from airport
            readDeps(vDeps, ka, stmtDeps);
    
            // combine airport insertions into one transaction
            db.exec("BEGIN TRANSACTION");
            // if (db.exec("BEGIN TRANSACTION"))
            //     throw std::runtime_error(
            //         "TRANSACTION FAILED");
    
            for (auto &r : vArrivals)
            {
                for (auto &d : vDeps)
                {
                    // connect arrivals to later departures
                    if (d.deptime > r.arrtime)
                    {
                        // save arrival flight name and departure flight name
                        db.bind(stmtConx, 1, r.name);
                        db.bind(stmtConx, 2, d.name);
                        if (db.exec(stmtConx))
                            throw std::runtime_error(
                                "INSERTION FAILED");
                    }
                }
            }
            db.exec("END TRANSACTION");
        }
    }
    
    main()
    {
        // initialize timer https://ravenspoint.wordpress.com/2010/06/16/timing/
        raven::set::cRunWatch::Start();
    
        // Construct database
        createDB();
    
        // Construct random flights between random airports
        const int flightCount = 700000;
        const int airportCount = 1000;
        std::cout << "Flights in DB " << flightCount << "\n";
        gen(flightCount, airportCount);
    
        // find and store connections
        findConnections(airportCount);
    
        // timing report
        raven::set::cRunWatch::Report();
        return 0;
    }