I have a function with multiple parameters, iterable_token
,dataframe
,label_array
. However, only iterable_token
is iterable in the function.
def cross_tab(label,token_presence):
A_token=0
B_token=0
C_token=0
D_token=0
for i,j in zip(list(label),list(token_presence)):
if i==True and j==True:
A_token+=1
elif i==False and j==False:
D_token+=1
elif i==True and j==False:
C_token+=1
elif i==False and j==True:
B_token+=1
return A_token,B_token,C_token,D_token
def My_ParallelFunction(iterable_token,dataframe,label_array):
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
token_presence_sum=0
i=0
for token in iterable_token:
try:
token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
except Exception as e:
pass
return (A,B,C,D,token_count,token_list)
How do i parallelize My_ParallelFunction
function?
Edit1: I tried the method suggested in example 1 because that's what i am looking for, to parallelize function.
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as p:
results = p.starmap(My_ParallelFunction, (iterable_token, dataframe,label_array))
but error message is
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 47, in starmapstar
return list(itertools.starmap(args[0], args[1]))
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
"""
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
<timed exec> in <module>
/usr/lib/python3.6/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
272 `func` and (a, b) becomes func(a, b).
273 '''
--> 274 return self._map_async(func, iterable, starmapstar, chunksize).get()
275
276 def starmap_async(self, func, iterable, chunksize=None, callback=None,
/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
642 return self._value
643 else:
--> 644 raise self._value
645
646 def _set(self, i, obj):
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
Edit2: Here is the file i am using. You can download it from here and unzip. Also, run below script to get the required input parameters. Make sure to install nltk
, pandas
and numpy
and change path to the file TokenFile.csv
.
from nltk import word_tokenize,sent_tokenize
import pandas as pd
import numpy as np
dataframe=pd.read_csv('/home/user/TokenFile.csv',nrows=100)
def get_uniquetoken(stop_words,input_doc_list):
##get unique words across all documents
if stop_words:
unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent) if word not in stop_words]
else:
unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent)]
unique_words=set(unique_words)
print('unique_words done! length is:',len(unique_words) )
return unique_words
input_token_list=dataframe['Master'].tolist()
label_array=dataframe['label_array'].tolist()
iterable_token=get_uniquetoken(None,input_token_list)
Edit 3 This is the solution i am using
def My_ParallelFunction(iterable_token,dataframe,label_array):
A={}
B={}
C={}
D={}
token_count={}
token_list=[]
i=0
with mp.Pool(4) as p:
token_result = p.starmap(_loop,[(token, dataframe, label_array,A,B,C,D,token_count,token_list) for token in iterable_token])
#print(token_result[0])
return token_result#(A,B,C,D,token_count,token_list)
def _loop(token, dataframe, label_array,A,B,C,D,token_count,token_list):
#print(token)
try:
token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
token_presence_sum=sum(token_presence)
#print(token_presence_sum)
if token_presence_sum:
A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
#print('token,A_token,B_token,C_token,D_token',token,A_token,B_token,C_token,D_token)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
# print('token_list:',token_list)
except Exception as e:
pass
return A,B,C,D,token_count,token_list
However it is not giving me the result i want. Its a 949 X 6 X different_sizes matrix
Here are two toy examples to show how you can parallelize a similar function.
First Option. If you want to parallelize the whole function. You can do that using Pool.starmap(). .starmap() works like map(), but you can pass multiple arguments to it.
from multiprocessing import Pool
import time
#Example 1 Simple function parallelization
def f(a,b,c,_list):
x = a+b+c
time.sleep(1)
_list.append(x)
return _list
inputs = [
(1,2,3,['a','b','c']),
(1,2,3,['d','e','f']),
(1,2,3,['x','y','z']),
(1,2,3,['A','B','C']),
]
start = time.time()
with Pool(4) as p:
results = p.starmap(f, inputs)
end = time.time()
for r in results:
print(r)
print(f'done in {round(end-start, 3)} seconds')
Output:
['a', 'b', 'c', 6]
['d', 'e', 'f', 6]
['x', 'y', 'z', 6]
['A', 'B', 'C', 6]
done in 1.084 seconds
Second option. If you want to parallelize only the for-loop inside the function. In that case, you should rewrite your loop as a function and call it using Pool.map() or Pool.starmap().
#Example 2. Function calling a parallel function
#loop
def g(_string):
time.sleep(1)
return _string + '*'
#outer function
def f(a,b,c,_list):
x = a+b+c
_list.append(str(x))
#loop parallelization
with Pool(4) as p:
new_list = p.map(g, _list)
return new_list
start = time.time()
result = f(1,2,3,['a','b','c'])
end = time.time()
print(result)
print(f'done in {round(end-start, 3)} seconds')
Output:
['a*', 'b*', 'c*', '6*']
done in 1.048 seconds
Note that the "loop function" contains the logic to deal with a single element of the iterable. Pool.map() will take care of run it for all the elements.
The time.sleep(1)
calls are to simulate some time-consuming calculation. If the parallelization works, you should be able to process 4 inputs in 1 second rather than in 4 seconds.
Here is an example using your code:
def My_ParallelFunction(iterable_token, dataframe, label_array):
with mp.Pool(4) as p:
token_result = p.starmap(
_loop,
[(token, dataframe, label_array) for token in iterable_token]
)
return token_result
def _loop(token, dataframe, label_array):
A={}
B={}
C={}
D={}
token_count = {}
token_list = []
try:
token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
token_presence_sum=sum(token_presence)
if token_presence_sum:
A_token, B_token, C_token, D_token = cross_tab(label_array, token_presence)
A[token]=A_token
B[token]=B_token
C[token]=C_token
D[token]=D_token
token_count[token]=token_presence_sum
token_list.append(token)
return A,B,C,D,token_count,token_list
except Exception as e:
print(e)