Search code examples
pythonhadoopscikit-learnpearson-correlationbigdata

Compute a Pairwise Distance Matrix: is a scalable, big-data-ready approach available in Python?


I have a CSV file containing feature values for items: each row is a triple (id_item, id_feature, value) representing the value of a specific feature for a specific item. The data is very sparse.

I need to compute two item distance matrixes, one using Pearson correlation as metric and the other using the Jaccard index.

At the moment I implemented an in-memory solution and I do something like this:

import numpy as np
from numpy import genfromtxt
from scipy.sparse import coo_matrix
from scipy.sparse import csr_matrix
from scipy.stats.stats import pearsonr
import sklearn.metrics.pairwise
import scipy.spatial.distance as ds
import scipy.sparse as sp

# read the data
my_data = genfromtxt('file.csv', delimiter=',')
i,j,value=my_data.T

# create a sparse matrix
m=coo_matrix( (value,(i,j)) )

# convert in a numpy array
m = np.array(m.todense())

# create the distance matrix using pdist
d = ds.pdist(m.T, 'correlation')

d= ds.squareform(d)

it works well and it's pretty fast but it is not scalable horizontally. I would like to be able to increase the performances just by adding nodes to a cluster and that everything could work even in a big data scenario, again just by adding nodes. I don't care if the process takes hours; distances need to be updated once a day.

What's the best approach?

1) Sklearn pairwise_distances has a n_jobs parameter that allows to take advantage of parallel computing (http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.pairwise_distances.html) but as far as I know it supports multiple cores on the same machine and not cluster computing. This is a related question Easy way to use parallel options of scikit-learn functions on HPC but I didn't get what is the best solution in my specific case and if Joblib actually has issues.

Also, the part that reads in memory the CSV would still be a bottleneck: I can store the CSV in HDFS and read it doing something like:

import subprocess
cat = subprocess.Popen(["hadoop", "fs", "-cat", "data.csv"], stdout=subprocess.PIPE)

and then loop through cat.stdout:

for line in cat.stdout:
    ....

but I am not sure it is a good solution.

2) Store data in HDFS, implement computation in a map reduce fashion and run the job via mrjob

3) Store data in HDFS, implement the computation in a SQL-like fashion (I don't know if it is easy and feasible, I have to think about it) and run it using PyHive

Of course I would like to keep as much as possible the current code, so a variant of the solution 1) is the best one for me.


Solution

  • To prototype:

    I suggest you to use Pyro4 and to implement that with divide and conquer paradigm, a master node and several slave nodes.

    If you have n items you have n(n-1)/2 pairs, you use sklearn pairwise distances with maximum of jobs (n_jobs parameter) on each node.

    You split your set of pairs in a tasks and execute that on b nodes and regroup result on your master node.

    For production:

    I advice you PySpark 2.1.1. Map Reduce becomes deprecated.