Search code examples
parallel-processinggpusparse-matrixcosine-similaritygnu-parallel

Parallel Cosine similarity of two large files with each other


I have two files: A and B

A has 400,000 lines each having 50 float values
B has 40,000 lines having 50 float values.

For every line in B, I need to find corresponding lines in A which have >90% similarity (cosine).

For linear search and computation, the code takes ginormous computing time. (40-50 hours)

Reaching out to the community for suggestions on how to fasten the process (link of blogs/resources such as AWS/Cloud to be used to achieve it). Have been stuck with this for quite a while!

[There were mentions of rpud/rpudplus to do it, but can't seem to perform them on cloud resources]

N.B. as requested, the code for cosine similarity is:

for line1, line2 in zip(f1, f2):
    line1 = line1[:-1]
    cnt = cnt + 1
    l2X = [float(i) for i in line2.split()]
    f3 = open(enLabelValues, 'r')
    f4 = open(enVectorValues, 'r')
    print cnt
    cnt_j = 0
    for line3, line4 in zip(f3, f4):
        line3 = line3[:-1]
        l4X = [float(i) for i in line4.split()]
        ########This is the line for spatial cosine similarity
        result = 1 - spatial.distance.cosine(l2X, l4X)
        cnt_j = cnt_j + 1
        if(result > float(0.95)):
            if line3 not in a.keys():
                a[line3] = True
                fLabel_2.write(line3+"\n")
                fX_2.write(line4)
        fLabel_2.flush()
        fX_2.flush()
        os.fsync(fLabel_2.fileno())
        os.fsync(fX_2.fileno())

Solution

  • I can generate synthetic files of 40,000 and 400,000 lines with 50 samples per line and process them in around 2 minutes 18 seconds on a reasonable 4 core (+hyperthreading) desktop iMac in my clumsy style of C++ without any SIMD optimisation (by me) using GNU Parallel.

    Here is the top-level script. You can see it generates the test data in "a.txt" and "b.txt". Then it "compresses" "b.txt" to an identical binary representation, with the pre-computed magnitude appended to each line. Finally, it numbers the lines in "a.txt" and passes them into GNU Parallel which splits the lines into groups of around 5,200 lines and starts a group of 8 parallel processes to compare each of those lines with the 40,000 lines in B.

    #!/bin/bash
    
    # Generate test data - a.txt b.txt
    ./generate
    
    # Preprocess b.txt into binary with precomputed magitudes save as B
    ./preprocess
    
    # Process file A in batches
    cat -n a.txt | parallel --block-size 2M --line-buffer --pipe ./process {#}
    

    Here is the generate.cpp program to synthesise data:

    #include <iostream>
    #include <cstdlib>
    #include <fstream>
    #include "common.h"
    
    using namespace std;
    
    int main()
    {
       int line,sample;
       ofstream a("a.txt");
       if (!a.is_open()){
          cerr << "ERROR: Unable to open output file";
          exit(EXIT_FAILURE);
       }
       for(line=0;line<ALINES;line++){
          for(sample=0;sample<SAMPLESPERLINE;sample++){
             a << (float)rand()*100/RAND_MAX << " ";
          }
          a << endl;
       }
       a.close();
       ofstream b("b.txt");
       if (!b.is_open()){
          cerr << "ERROR: Unable to open output file";
          exit(EXIT_FAILURE);
       }
       for(line=0;line<BLINES;line++){
          for(sample=0;sample<SAMPLESPERLINE;sample++){
             b << (float)rand()*100/RAND_MAX << " ";
          }
          b << endl;
       }
       b.close();
    }
    

    Here is the preprocess.cpp code:

    #include <sstream>
    #include <fstream>
    #include <string>
    #include <iostream>
    #include <stdlib.h>
    #include <vector>
    #include <cmath>
    #include "common.h"
    
    int main(int argc, char* argv[]){
    
       std::ifstream btxt("b.txt");
       std::ofstream bbin("B",std::ios::out|std::ios::binary);
       if (!btxt.is_open()){
          std::cerr << "ERROR: Unable to open input file";
          exit(EXIT_FAILURE);
       }
       if (!bbin.is_open()){
          std::cerr << "ERROR: Unable to open output file";
          exit(EXIT_FAILURE);
       }
    
       int l=0;
       std::string line;
       std::vector<float> v;
       v.resize(SAMPLESPERLINE+1);
       while (std::getline(btxt,line)){
          std::istringstream iss(line);
          v.clear();
          float f;
          double magnitude;
          magnitude=0.0;
          int s=0;
          while (iss >> f){
             v[s]=(f);
             magnitude+=(double)f*f;
             s++;
          }
          // Append the magnitude to the end of the "line"
          v[s]=(float)sqrt(magnitude);
          // Write the samples and magnitide in binary to the output file
          bbin.write(reinterpret_cast<char*>(&v[0]),(SAMPLESPERLINE+1)*sizeof(float));
          l++;
       }
       btxt.close();
       bbin.close();
    
       return EXIT_SUCCESS;
    }
    

    Here is the common.h file:

    const int ALINES=400000;
    const int BLINES=40000;
    const int SAMPLESPERLINE=50;
    

    And here is the process.cpp code:

    #include <sstream>
    #include <fstream>
    #include <string>
    #include <iostream>
    #include <stdlib.h>
    #include <vector>
    #include <array>
    #include <cmath>
    #include "common.h"
    
    int main(int argc, char* argv[]){
    
       if(argc!=2){
          std::cerr << "Usage: process JOBNUM" << std::endl;
          exit(1);
       }
       int JobNum=std::atoi(argv[1]);
       std::cerr << "Starting job: " << JobNum << std::endl;
    
       // Load B
       std::ifstream bbin("B",std::ios::binary);
       if (!bbin.is_open()){
          std::cerr << "ERROR: Unable to open B";
          exit(EXIT_FAILURE);
       }
    
       int l=0;
       std::array<float,SAMPLESPERLINE+1> record;
       std::vector<std::array<float,SAMPLESPERLINE+1>> B;
       B.resize(BLINES);
       for(l=0;l<BLINES;l++){
          // Read one record of 50 floats and their magnitude
          bbin.read(reinterpret_cast<char*>(&B[l][0]),sizeof(float)*(SAMPLESPERLINE+1));
       }
       bbin.close();
    
       // Process all lines read from stdin, each line prepended by its line number
       // Format is:
       // <line number in file "a.txt"> <SAMPLE0> <SAMPLE1> ... <SAMPLE49>
       int nLines=0;
       std::string line;
       while (std::getline(std::cin,line)){
          nLines++;
          std::istringstream iss(line);
          std::vector<float> A;
          A.resize(SAMPLESPERLINE);
          float f;
          int Alineno;
          int s=0;
          iss >> Alineno;
          double dMag=0.0;
          while (iss >> f){
             A[s++]=f;
             dMag+=(double)f*f;
          }
          // Root magnitude
          float AMagnitude=(float)sqrt(dMag);
    
          // At this point we have in B, 40,000 records each of 50 samples followed by the magnitude
          // ... and we have a single record from "a.txt" with 50 samples and its magnitude in AMagnitude
          // ... and Alineno is the absolute line number in "a.txt" of this line
          // Time to do the actual calculation: compare this record to all records in B
          for(int brec=0;brec<BLINES;brec++){
             float BMagnitude=B[brec][SAMPLESPERLINE];
             double dotproduct=0.0;
             float *a = &A[0];
             float *b = &B[brec][0];
             for(s=0;s<SAMPLESPERLINE;s++){
                dotproduct += (*a++) * (*b++);
             }
             float similarity = dotproduct/(AMagnitude*BMagnitude);
             if(similarity>0.99){
                std::cout << "Line A: " << Alineno << ", line B: " << brec << ", similarity:" << similarity << std::endl;
             }
          }
       }
       std::cerr << "Ending job: " << JobNum << ", processed " << nLines << " lines" << std::endl;
    
       return EXIT_SUCCESS;
    }
    

    The Makefile is pretty simple:

    CFLAGS= -std=c++11 -O3 -march=native
    
    all:    generate preprocess process
    
    generate:   generate.cpp
            clang++ ${CFLAGS} generate.cpp -o generate
    
    preprocess: preprocess.cpp
            clang++ ${CFLAGS} preprocess.cpp -o preprocess
    
    process:    process.cpp
            clang++ ${CFLAGS} process.cpp -o process
    

    When you run it, it pegs the CPU for 2 minutes and looks like this:

    time ./go
    Starting job: 3
    Starting job: 7
    Starting job: 8
    Starting job: 2
    Starting job: 5
    Starting job: 1
    Starting job: 4
    Starting job: 6
    Ending job: 1, processed 5204 lines
    Starting job: 9
    Ending job: 2, processed 5203 lines
    Ending job: 3, processed 5204 lines
    Starting job: 11
    Starting job: 10
    Ending job: 4, processed 5204 lines
    Starting job: 12
    Ending job: 5, processed 5203 lines
    Ending job: 6, processed 5203 lines
    Starting job: 14
    Starting job: 13
    ...
    ...
    Starting job: 75
    Ending job: 68, processed 5204 lines
    Ending job: 69, processed 5203 lines
    Starting job: 76
    Starting job: 77
    Ending job: 70, processed 5203 lines
    Ending job: 71, processed 5204 lines
    Ending job: 72, processed 5203 lines
    Ending job: 77, processed 4535 lines
    Ending job: 74, processed 5204 lines
    Ending job: 73, processed 5205 lines
    Ending job: 75, processed 5204 lines
    Ending job: 76, processed 5203 lines
    
    real    2m17.510s
    user    16m24.533s
    sys     0m4.426s
    

    Note that I have not done any explicit SIMD or loop-unrolling or used any intrinsics to form the dot-product. I suspect if you asked a question about forming a dot-product and tagged it with simd or avx, someone would help you optimise it.


    Note also that you could easily run this code across multiple computers with GNU Parallel, assuming you have ssh login to them, just using:

    parallel -S host1,host2,host3 ....
    

    For example, I have a 6-core Debian PC on my network, so I ran the above code parallelised across my 4-core Mac and 6-core Debian machine with:

    parallel -S :,debian ...
    

    and it then takes 1 minute 8 seconds.