Search code examples
pythonpandasmultiprocessingdata-sciencepython-multiprocessing

Pandas easy parralel/multiprocess calculations


I am looking for a fast and easy-to-use solution to make parallel calculations with pandas. I know it is a very important topic for data science but I did not find something easy, much more faster than standard pandas df.apply funct, and overall fast to implement!

so...

Let's have a quick overview of available tools/frameworks out there. Of course I do assume not to talk about asyncio which not directly deals with my topic.

Dask

Please find a good article on https://towardsdatascience.com/how-i-learned-to-love-parallelized-applies-with-python-pandas-dask-and-numba-f06b0b367138 or directly on the Dask web site : http://docs.dask.org/en/latest/use-cases.html

Find below a snippet which currently do not work but give us a pretty good idea of the implementation :

from dask import dataframe as dd
from dask.multiprocessing import get
from multiprocessing import cpu_count

cores = cpu_count()

dd.from_pandas(my_df,npartitions=cores).\
   map_partitions(
      lambda df : df.apply(
         lambda x : nearest_street(x.lat,x.lon),axis=1)).\
   compute(get=get)

Personally, I find this implementation very painful (ok, mabybe I'm a lazy man), but overall, I found this implementation not very fast, sometimes slower than the old fashion df[feature] = df.feature.apply(my_funct)


MultiProcessing

Find below a snippet of code to run easily a multi-process task, but ... with HDD IO. This implementation could or could not work, but give us a very good idea of the code implementation

import os
from multiprocessing import Process, cpu_count
from math import ceil
from tqdm import tqdm
import numpy as np


def chunks(l, n) :
    numbs =  [ceil(i) for i in np.linspace(0,len(l)+1, n+1)]    
    pairs = list()
    for i, val in enumerate(numbs) : 
        try : 
            pairs.append((numbs[i], numbs[i+1]))
        except : 
            return pairs

def my_funct(i0=0, i1=10000000) : 
    for n in tqdm(features[i0:i1]) :
        _df = df.loc[df.feature == n, :]
        _df = do_something_complex(_df)
        _df.to_csv(f"{my_path}/feat-{n}.csv", index=False)


# multiprocessing
cores = cpu_count()
features = df.feature.unique()
if cores < 2 : 
    my_funct(i0=0, i1=100000000)
else : 
    chks  = chunks(features, cores)
    process_list = [Process(target=my_funct, args=chk) \
                    for chk in chks]
    [i.start() for i in process_list]
    [i.join()  for i in process_list]

# join files and 'merge' in our new_df 
new_df = pd.DataFrame(columns=df.columns)
for filename in os.listdir(my_path) : 
    new_df = new_df.append(pd.read_csv(f'{my_path}/{filename}'),\
                           axis=0, ignore_index=True)
    os.remove(f'{my_path}/{filename}')

Ok this implementation is overkilled but 1/ it works most of times, 2/ it is easily understandable, and 3/ it is faster than df = df.apply(my_funct) and -- sometimes -- faster than Dask

BUT ... assuming that I could not statistically be the only/first one to deal with such a topic...

Could you please help me? Is there any solution out there? Is there something like :

  • df.multi_process_apply(my_funct) or
  • df.parralel_apply(my_func)

Thanks a Lot !


Solution

  • You can try Pandarallel.

    DISCLAIMER: I am the author of this lib (which is still under development, but you can already achieve good results with it).

    Without parallelisation: enter image description here

    With parallelisation: enter image description here

    Just replace df.apply(func) by df.parallel_apply(func) and all your CPUs will be used.