Search code examples
pythonpandasparallel-processingtypeerrorprocess-pool

How to pass variables to ProcessPoolExecutor, got type error


i'm trying to modify my code to apply Parallelism with ProcessPoolExecutor. as i read on internet i should use Map() function to pass my variables to the function. but it throws typeerror, which needs iterabels. should i modify the method so it would only get one row and then feed the variables through a for loop?

the Error:

    186 def _get_chunks(*iterables, chunksize):
    187     """ Iterates over zip()ed iterables in chunks. """
--> 188     it = zip(*iterables)
    189     while True:
    190         chunk = tuple(itertools.islice(it, chunksize))

TypeError: 'float' object is not iterable

my Code:

with ProcessPoolExecutor() as executor:
    df = executor.map(drop_dupplicate_rows, df,'title',0.7,chunksize=100)

def drop_dupplicate_rows(_df,_column,_cuttoff):
  '''
  drop Duplicate rows based on difflib library 
  
  Parameters
  ----------
  df: DataFrame
    the data frame
  _column: string
    the column name to be checked for duplicates
  _cuttoff: int
    the percentage based of which rows are dropped
  Returns
  ----------
  the cleaned dataframe
  '''
  for k,row in enumerate(_df[f'{_column}']):
    repetition_list=list()    
    if not np.where(_df[f'{_column}']==''):
        repetition_list=difflib.get_close_matches(row,_df[f'{_column}'],cutoff=_cuttoff)
        if len(repetition_list)>=2:
            print(f'row:{k},link: {_df.at[k,"url"]} due to: {_column} repetition_list: ',repetition_list)
            _df.drop(index=k,inplace=True)
    _df.reset_index(drop=True, inplace=True)
  return _df

Solution

  • I'm not sure of the implementation of your function. However, you can use multiprocessing with:

    from functools import partial
    from multiprocessing import Pool
    
    def drop_dupplicate_rows(_df,_column,_cuttoff):
        # your code
    
    # protect the entry point
    if __name__ == '__main__':
        # Load your dataframe here
        # df = ...
    
        CHUNKSIZE=100
        chunks = (df[i:i+CHUNKSIZE] for i in range(0, len(df), CHUNKSIZE))
    
        # create the multiprocessing pool (4 CPU)
        with Pool(4) as pool:
            partial_func = partial(drop_dupplicate_rows, _column='title', _cuttoff=0.7)
            out = pool.map(partial_func, chunks)