i'm trying to modify my code to apply Parallelism with ProcessPoolExecutor
. as i read on internet i should use Map()
function to pass my variables to the function. but it throws typeerror, which needs iterabels. should i modify the method so it would only get one row and then feed the variables through a for loop?
the Error:
186 def _get_chunks(*iterables, chunksize):
187 """ Iterates over zip()ed iterables in chunks. """
--> 188 it = zip(*iterables)
189 while True:
190 chunk = tuple(itertools.islice(it, chunksize))
TypeError: 'float' object is not iterable
my Code:
with ProcessPoolExecutor() as executor:
df = executor.map(drop_dupplicate_rows, df,'title',0.7,chunksize=100)
def drop_dupplicate_rows(_df,_column,_cuttoff):
'''
drop Duplicate rows based on difflib library
Parameters
----------
df: DataFrame
the data frame
_column: string
the column name to be checked for duplicates
_cuttoff: int
the percentage based of which rows are dropped
Returns
----------
the cleaned dataframe
'''
for k,row in enumerate(_df[f'{_column}']):
repetition_list=list()
if not np.where(_df[f'{_column}']==''):
repetition_list=difflib.get_close_matches(row,_df[f'{_column}'],cutoff=_cuttoff)
if len(repetition_list)>=2:
print(f'row:{k},link: {_df.at[k,"url"]} due to: {_column} repetition_list: ',repetition_list)
_df.drop(index=k,inplace=True)
_df.reset_index(drop=True, inplace=True)
return _df
I'm not sure of the implementation of your function. However, you can use multiprocessing
with:
from functools import partial
from multiprocessing import Pool
def drop_dupplicate_rows(_df,_column,_cuttoff):
# your code
# protect the entry point
if __name__ == '__main__':
# Load your dataframe here
# df = ...
CHUNKSIZE=100
chunks = (df[i:i+CHUNKSIZE] for i in range(0, len(df), CHUNKSIZE))
# create the multiprocessing pool (4 CPU)
with Pool(4) as pool:
partial_func = partial(drop_dupplicate_rows, _column='title', _cuttoff=0.7)
out = pool.map(partial_func, chunks)