Search code examples
pythonpython-3.xpandasmultiprocessingsklearn-pandas

What is the most efficient way to apply multiprocessing to unique categories of entries in a pandas dataframe?


I have a large dataset (tsv) that looks something like this:

category     lat         lon  
apple        34.578967   120.232453  
apple        34.234646   120.535667  
pear         32.564566   120.453567  
peach        33.564567   121.456445  
apple        34.656757   120.423566  

The overall goal would be to pass a dataframe containing all records for a single category to DBScan to generate cluster labels, and do this for all categories using the multiprocessing module. I can get this to work, but I'm currently reloading the entire dataset within each process in order to subset to the category because I continue to get errors when attempting to reference the entire dataset as a global variable. Code looks like so:

import pandas as pd
from sklearn.cluster import DBSCAN
import multiprocessing as mp

def findClusters(inCat):
    inTSV = r"C:\trees.csv"
    clDF = pd.read_csv(inTSV, sep='\t')
    catDF = clDF[clDF['category'] == 'inCat']
    kms = 0.05
    scaleDist = 0.01*kms
    x = 'lon'
    y = 'lat'
    dbscan = DBSCAN(eps=scaleDist, min_samples=5)
    clusters = dbscan.fit_predict(catDF[[x,y]])
    catDF['cluster'] = clusters
    catDF.to_csv(r"C:\%s.csv" % (inCat))
    del catDF

if __name__ == "__main__":

    inTSV = r"C:\trees.csv"
    df = pd.read_csv(inTSV, sep='\t')

    catList = list(df.category.unique())

    cores = mp.cpu_count()
    pool = mp.Pool(cores - 1)
    pool.map(findClusters, catList)
    pool.close()
    pool.join()

I know this isn't the most efficient way to do this as I am rereading and also writing out to intermediate files. I want to run the clustering of each category of data in parallel. Can I build a list of dataframes (one for each category) that feeds the multiprocessing pool? How would these all be caught after processing (wrapped in a concat call?). Is there a better way to load the data up once in to memory and have each process be able to access it to slice out the category data it needs, how?

Running Anaconda, Python 3.5.5

Thanks for any insight.


Solution

  • You can use df.groupby, so note:

    In [1]: import pandas as pd
    
    In [2]: df = pd.read_clipboard()
    
    In [3]: df
    Out[3]:
      category        lat         lon
    0    apple  34.578967  120.232453
    1    apple  34.234646  120.535667
    2     pear  32.564566  120.453567
    3    peach  33.564567  121.456445
    4    apple  34.656757  120.423566
    
    In [4]: list(df.groupby('category'))
    Out[4]:
    [('apple',   category        lat         lon
      0    apple  34.578967  120.232453
      1    apple  34.234646  120.535667
      4    apple  34.656757  120.423566),
     ('peach',   category        lat         lon
      3    peach  33.564567  121.456445),
     ('pear',   category        lat         lon
      2     pear  32.564566  120.453567)]
    

    And just re-write your function to expect a pair, something like:

    def find_clusters(grouped):
        cat, catDF = grouped
        kms = 0.05
        scale_dist = 0.01*kms
        x = 'lon'
        y = 'lat'
        dbscan = DBSCAN(eps=scale_dist, min_samples=5)
        clusters = dbscan.fit_predict(catDF[[x,y]])
        catDF['cluster'] = clusters
        catDF.to_csv(r"C:\%s.csv" % (cat))
    

    Honestly, writing to intermediate files is fine, I think.

    If not, you can always just do:

    return catDF
    

    Instead of

    catDF.to_csv(r"C:\%s.csv" % (cat))
    

    And then:

    df = pd.concat(pool.map(findClusters, catList))