Search code examples
pandasparallel-processingnetworkxgraph-theorynetworkit

Graph (networkit) - create edges from the record pairs that share the same value in two, or more, of the same columns in pandas


I'm trying to create graph with edges only for nodes/(records index in dataframe) that have the same values in any 2 or more columns.
What I'm doing - I create a list with all possible combination pairs of column names and go through them searching for duplicates, for which I extract indexes and create edges.
The problem is that for huge datasets (millions of records) - this solution is too slow and requires too much memory.

What I do:

df = pd.DataFrame({
    'A': [1, 2, 3, 4, 5],
    'B': [1, 1, 1, 1, 2],
    'C': [1, 1, 2, 3, 3],
    'D': [2, 7, 9, 8, 4]})  
A B C D
0 1 1 1 2
1 2 1 1 7
2 3 1 2 9
3 4 1 3 8
4 5 2 3 4

Here, rows 0 and 1 have 2 same values in columns B and C.
So, for nodes 0,1,2,3,4 I need to create edge 0-1. Other records have at maximum 1 same field between each other.

    graph = nk.Graph(num_nodes, directed=False, weighted=False)

    # Get the indices of all unique pairs
    indices = np.triu_indices(len(column_names), k=1)
    # Get the unique pairs of column names
    unique_pairs = np.column_stack((column_names[indices[0]], column_names[indices[1]]))

    for col1, col2 in unique_pairs:
        # Filter the dataframe directly
        duplicated_rows = df[[col1, col2]].dropna()
        duplicated_rows = duplicated_rows[duplicated_rows.duplicated(subset=[col1, col2], keep=False)]

    for _, group in duplicated_rows.groupby([col1, col2]):
        tb_ids = group.index.tolist()
        for i in range(len(tb_ids)):
            for j in range(i + 1, len(tb_ids)):
                graph.addEdge(tb_ids[i], tb_ids[j])

Main question - how to speed up / improve this solution? I was thinking about parallelization by column combination - but in this case can't figure out how to create edges in a graph properly.
Appreciate any help.


Solution

  • Memory Problem

    Your multi-million record input generates so many pairs, they cannot all be kept in memory.

    You will have to give up storing everything in memory. You will need to store the data in a highly optimized database. I suggest SQLite. bring input data into memory as required and store the pairs to the database as they are found. If you properly optimize your use of SQLite then the performance hit will be minimal and you will not run out of memory

    Performance problem

    Storing pairs to a database will slow the performance slightly.

    You will need to optimize how you use the database. The two most important optimizations are:

    • Transaction Grouping. Initially, keep the pairs as they are found in memory. When the pair count reaches a specified number, write them all to the database in one transaction.

    • Asynchronous Write. Once you have handed off the writes to the db engine, do not wait for confirmation that the write succeeded - just blaze ahead with the pair search.

    You forgot to state your performance requirement! However, whatever your requirement might be, I will assume that you will need to squeeze out a significant improvement.

    I see that you are using python. This is an interpreted language, so the performance will be sluggish. Switching to a compiled language will give you a significant performance boost. For example using well coded C++ can give an improvement of up to 50 times.

    Algorithm

    SET T number of pairs to writ in one DB transaction
    LOOP N over all records
       IF N has 2 or more identical values
          LOOP M over records N+1 to last
              LOOP C over columns
                  LOOP D over cols C+1 to last
                     IF N[C] == N[D] == M[C] == M[D]
                         SAVE M,N to memory pair store
                         IF memory pair store size >= T
                             WRITE memory pair store to DB
                             CLEAR memory pair store
    WRITE memory pair store to DB
    

    Example:

    Here is an implementation of these ideas in C++ that finds ~6,000,000 pairs in 100,000 records in 40 seconds on a modest laptop.

    #include <string>
    #include <fstream>
    #include <sstream>
    #include <iostream>
    #include <vector>
    #include <algorithm>
    #include <time.h>
    #include "sqlite3.h"
    #include "cRunWatch.h" // https://ravenspoint.wordpress.com/2010/06/16/timing/
    
    std::vector<std::vector<int>> vdata;
    
    class cPairStorage
    {
        std::vector<std::pair<int, int>> vPair;
        sqlite3 *db;
        char *dbErrMsg;
        int transactionCount;
    
    public:
        cPairStorage();
    
        void add(int r1, int r2)
        {
            vPair.push_back(std::make_pair(r1, r2));
            if (vPair.size() > transactionCount)
                writeDB();
        }
    
        void writeDB();
    
        int count();
    
        std::pair<int, int> get(int index);
    };
    
    cPairStorage pairStore;
    
    cPairStorage::cPairStorage()
    : transactionCount(500)
    {
        int ret = sqlite3_open("pair.db", &db);
        if (ret)
            throw std::runtime_error("failed to open db");
        ret = sqlite3_exec(db,
                           "CREATE TABLE IF NOT EXISTS pair (r1, r2);",
                           0, 0, &dbErrMsg);
        ret = sqlite3_exec(db,
                           "DELETE FROM pair;",
                           0, 0, &dbErrMsg);
        ret = sqlite3_exec(db,
                           "PRAGMA schema.synchronous = 0;",
                           0, 0, &dbErrMsg);
    }
    
    void cPairStorage::writeDB()
    {
        //raven::set::cRunWatch aWatcher("writeDB");
    
        sqlite3_stmt *stmt;
        int ret = sqlite3_prepare_v2(
            db,
            "INSERT INTO pair VALUES ( ?1, ?2 );",
            -1, &stmt, 0);
    
        ret = sqlite3_exec(
            db,
            "BEGIN TRANSACTION;",
            0, 0, &dbErrMsg);
    
        for (auto &p : vPair)
        {
            ret = sqlite3_bind_int(stmt, 1, p.first);
            ret = sqlite3_bind_int(stmt, 2, p.second);
            ret = sqlite3_step(stmt);
            ret = sqlite3_reset(stmt);
        }
    
        ret = sqlite3_exec(
            db,
            "END TRANSACTION;",
            0, 0, &dbErrMsg);
    
        //std::cout << "stored " << vPair.size() << "\n";
    
        vPair.clear();
    }
    
    int cPairStorage::count()
    {
        int ret;
    
        sqlite3_stmt *stmt;
        ret = sqlite3_prepare_v2(
            db,
            "SELECT count(*) FROM pair;",
            -1, &stmt, 0);
        ret = sqlite3_step(stmt);
    
        int count = sqlite3_column_int(stmt, 0);
        ret = sqlite3_reset(stmt);
        return count;
    }
    
    std::pair<int, int> cPairStorage::get(int index)
    {
        if (0 > index || index >= count())
            throw std::runtime_error("bad pair index");
    
        std::pair<int, int> pair;
        int ret;
        sqlite3_stmt *stmt;
        ret = sqlite3_prepare_v2(
            db,
            "SELECT * FROM pair WHERE rowid = ?1;",
            -1, &stmt, 0);
        ret = sqlite3_bind_int(stmt, 1, index);
        ret = sqlite3_step(stmt);
        pair.first = sqlite3_column_int(stmt, 0);
        pair.second = sqlite3_column_int(stmt, 1);
        ret = sqlite3_reset(stmt);
        return pair;
    }
    
    void generateRandom(
        int colCount,
        int rowCount,
        int maxValue)
    {
        srand(time(NULL));
        for (int krow = 0; krow < rowCount; krow++)
        {
            std::vector<int> vrow;
            for (int kcol = 0; kcol < colCount; kcol++)
                vrow.push_back(rand() % maxValue + 1);
            vdata.push_back(vrow);
        }
    }
    
    bool isPair(int r1, int r2)
    {
        auto &v1 = vdata[r1];
        auto &v2 = vdata[r2];
        for (int kc1 = 0; kc1 < v1.size(); kc1++)
        {
            for (int kc2 = kc1 + 1; kc2 < v1.size(); kc2++)
            {
                int tv = v1[kc1];
                if (tv != v1[kc2])
                    continue;
                if (tv != v2[kc1])
                    continue;
                if (tv != v2[kc2])
                    continue;
                return true;
            }
        }
        return false;
    }
    void findPairs()
    {
    raven::set::cRunWatch aWatcher("findPairs");
    
    int colCount = vdata[0].size();
    
    for (int kr1 = 0; kr1 < vdata.size(); kr1++)
    {
        bool pairPossible = false;
        for (int kc1 = 0; kc1 < colCount; kc1++) {
            for (int kc2 = kc1 + 1; kc2 < colCount; kc2++) {
                if (vdata[kr1][kc1] == vdata[kr1][kc2])
                {
                    // row has two cols with equal values
                    // so it can be part of a row pair
                    pairPossible = true;
                    break;
                }
            }
            if (!pairPossible)
                break;
        }
        if (!pairPossible)
            continue;
        for (int kr2 = kr1 + 1; kr2 < vdata.size(); kr2++)
            if (isPair(kr1, kr2))
                pairStore.add(kr1, kr2);
    }
    
    pairStore.writeDB();
    }
    
    void display()
    {
        std::cout << "\nFound " << pairStore.count() << " pairs in " << vdata.size() << " records\n\n";
        std::cout << "First 2 pairs found:\n\n";
        for (int kp = 0; kp < 2; kp++)
        {
            auto p = pairStore.get(kp+1);
            for (int v : vdata[p.first])
                std::cout << v << " ";
            std::cout << "\n";
            for (int v : vdata[p.second])
                std::cout << v << " ";
            std::cout << "\n\n";
        }
    
        raven::set::cRunWatch::Report();
    }
    
    main(int ac, char *argc[])
    {
        int rowCount = 10;
        if (ac == 2)
            rowCount = atoi(argc[1]);
    
        raven::set::cRunWatch::Start();
    
        generateRandom(
            5,        // columns
            rowCount, // rows
            20);      // max value
    
        findPairs();
    
        display();
    
        return 0;
    }
    

    Output from a test run

    >matcher --rows 100000 --trans 10000 --seed 571
    
    unit tests passed
    
    Found 6238872 pairs in 100000 records
    
    First 2 pairs found:
    
    4 4 13 18 18
    4 4 1 10 7
    
    4 4 13 18 18
    4 4 11 3 1
    
    raven::set::cRunWatch code timing profile
    Calls           Mean (secs)     Total           Scope
    1               40.3924         40.3924         findPairs
    

    Complete application with documentation in github repo https://github.com/JamesBremner/RecordMatcher

    Multithreading

    It is straightforward to split the data to be searched into two parts and search each part in its own thread. As often happens with multithreading applications the performance results at first are disappointing. However, by tuning the configuration parameters, I have achieved what seems like a worthwhile improvement.

    Finds ~6,000,000 pairs in 100,000 records in 30 seconds on a modest laptop.

    >matcher --rows 100000 --trans 10000 --seed 571 --multi
    
    unit tests passed
    
    Found 6238872 pairs in 100000 records
    
    First 2 pairs found:
    
    4 4 13 18 18
    4 4 1 10 7
    
    4 4 13 18 18
    4 4 11 3 1
    
    raven::set::cRunWatch code timing profile
    Calls           Mean (secs)     Total           Scope
    1               29.6909         29.6909         findPairs