Search code examples
pythonpandasmultiprocessingpython-multiprocessingdask

why is multiprocessing slower than a simple computation in Pandas?


This is related to how to parallelize many (fuzzy) string comparisons using apply in Pandas?

Consider this simple (but funny) example again:

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd

master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})

slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [1,2,3,4,5]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.loc[slave_df.score.idxmax(),'my_value']

master
Out[39]: 
                  original
0  this is a nice sentence
1      this is another one
2    stackoverflow is nice

slave
Out[40]: 
   my_value                      name
0         1               hello world
1         2           congratulations
2         3  this is a nice sentence 
3         4       this is another one
4         5     stackoverflow is nice

What I need to do is simple:

  • For every row in master, I lookup into the Dataframe slave for the best match using the string similarity score computed by fuzzywuzzy.

Now let's make these dataframes a bit bigger:

master = pd.concat([master] * 100,  ignore_index  = True)
slave = pd.concat([slave] * 10,  ignore_index  = True)

First, I have tried with dask

#prepare the computation
dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))

and now here are the timings:

#multithreaded
%timeit dmaster.compute(get=dask.threaded.get) 
1 loop, best of 3: 346 ms per loop

#multiprocess
%timeit dmaster.compute(get=dask.multiprocessing.get) 
1 loop, best of 3: 1.93 s per loop

#good 'ol pandas
%timeit master['my_value'] = master.original.apply(lambda x: helper(x,slave))
100 loops, best of 3: 2.18 ms per loop

Second, I have tried with the good old multiprocessing package

from multiprocessing import Pool, cpu_count

def myfunc(df):
    return df.original.apply(lambda x: helper(x, slave))

from datetime import datetime

if __name__ == '__main__':
     startTime = datetime.now()
     p = Pool(cpu_count() - 1)
     ret_list = p.map(myfunc, [master.iloc[1:100,], master.iloc[100:200 ,],
                               master.iloc[200:300 ,]])
     results = pd.concat(ret_list)
     print datetime.now() - startTime

which gives about the same time

runfile('C:/Users/john/untitled6.py', wdir='C:/Users/john')
0:00:01.927000

Question: why is multiprocessing with both Dask and multiprocessing so slow compared to Pandas here? Assume my real data is much bigger than that. Could I get a better outcome?

After all, the problem I consider here is embarassingly parallel (every row is an independent problem), so these packages should really shine.

Am I missing something here?

Thanks!


Solution

  • Let me summarize the comments I made into something like an answer. I hope this information proves useful, as there are a number of issues rolled into one here.

    First, I would like to point you to distributed.readthedocs.io/en/latest/efficiency.html , where a number of dask performance topics are discussed. Note that this is all in terms of the distributed scheduler, but since that can be started in-process, with threads or processes, or a combination of these, it really does supercede the previous dask schedulers, and is generally recommended in all cases.

    1) It takes time to create processes. This is always true, and particularly true on windows. You will want to create the processes only once, with its fixed overhead, and run many tasks, if you are interested in real-life performance. In dask there are many ways of making your cluster, even locally.

    2) Every task that dask (or any other dispatcher) handles incurs some overhead. In the case of the distributed scheduler, this is <1ms, but in the case where the runtime of the task itself is very short, this can be significant.

    3) It is an anti-pattern in dask to load the whole dataset in the client and pass it to the worker(s). You want, instead, to use functions like dask.dataframe.read_csv, where the data is loaded by the workers, avoiding expensive serialization and inter-process communication. Dask is really good at moving the computation to where the data is, minimizing communication.

    4) When communication between processes, the method of serialization matters, which is my guess at why non-dask multiprocessing is so slow for you.

    5) Finally, not all jobs will find gains in performance under dask. This depends on a number of things, but often the main one is: does the data comfortably fit in memory? If yes, it may be hard to match the well-optimized methods in numpy and pandas. As always, you should always profile your code...