Search code examples
pythonpython-multiprocessing

python multiprocessing pool in a function with multiple input parameter but only one iterable


I have a function with multiple parameters, iterable_token,dataframe,label_array. However, only iterable_token is iterable in the function.

def cross_tab(label,token_presence):
    A_token=0
    B_token=0
    C_token=0
    D_token=0
    for i,j in zip(list(label),list(token_presence)):
        if i==True and j==True:
            A_token+=1
        elif i==False and j==False:
            D_token+=1
        elif i==True and j==False:
            C_token+=1
        elif i==False and j==True:
            B_token+=1
    return A_token,B_token,C_token,D_token

def My_ParallelFunction(iterable_token,dataframe,label_array):
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    token_presence_sum=0
    i=0
    
    for token in iterable_token:
        try:
            token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
            token_presence_sum=sum(token_presence)
            if token_presence_sum:
                A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
                A[token]=A_token
                B[token]=B_token
                C[token]=C_token
                D[token]=D_token
                token_count[token]=token_presence_sum
                token_list.append(token)
        except Exception as e:
            pass
    return (A,B,C,D,token_count,token_list)

How do i parallelize My_ParallelFunction function?

Edit1: I tried the method suggested in example 1 because that's what i am looking for, to parallelize function.

import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as p:
    results = p.starmap(My_ParallelFunction, (iterable_token, dataframe,label_array))

but error message is

RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 47, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given
"""

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
<timed exec> in <module>

/usr/lib/python3.6/multiprocessing/pool.py in starmap(self, func, iterable, chunksize)
    272         `func` and (a, b) becomes func(a, b).
    273         '''
--> 274         return self._map_async(func, iterable, starmapstar, chunksize).get()
    275 
    276     def starmap_async(self, func, iterable, chunksize=None, callback=None,

/usr/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
    642             return self._value
    643         else:
--> 644             raise self._value
    645 
    646     def _set(self, i, obj):

TypeError: My_ParallelFunction() takes 3 positional arguments but 949 were given

Edit2: Here is the file i am using. You can download it from here and unzip. Also, run below script to get the required input parameters. Make sure to install nltk, pandas and numpy and change path to the file TokenFile.csv.

from nltk import word_tokenize,sent_tokenize
import pandas as pd
import numpy as np

dataframe=pd.read_csv('/home/user/TokenFile.csv',nrows=100)

def get_uniquetoken(stop_words,input_doc_list):
    ##get unique words across all documents
    if stop_words:
        unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent) if word not in stop_words]
    else:
        unique_words=[word for doc in input_doc_list for sent in sent_tokenize(doc) for word in word_tokenize(sent)]
    unique_words=set(unique_words)
    print('unique_words done! length is:',len(unique_words) )
    return unique_words


input_token_list=dataframe['Master'].tolist()
label_array=dataframe['label_array'].tolist()
iterable_token=get_uniquetoken(None,input_token_list)

Edit 3 This is the solution i am using

def My_ParallelFunction(iterable_token,dataframe,label_array):
    A={}
    B={}
    C={}
    D={}
    token_count={}
    token_list=[]
    i=0
    
    with mp.Pool(4) as p:
        token_result = p.starmap(_loop,[(token, dataframe, label_array,A,B,C,D,token_count,token_list) for token in iterable_token])
    #print(token_result[0])
    return token_result#(A,B,C,D,token_count,token_list)


def _loop(token, dataframe, label_array,A,B,C,D,token_count,token_list):
    #print(token)
    try:
        token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
        token_presence_sum=sum(token_presence)
        #print(token_presence_sum)
        if token_presence_sum:
            A_token,B_token,C_token,D_token=cross_tab(label_array,token_presence)
            #print('token,A_token,B_token,C_token,D_token',token,A_token,B_token,C_token,D_token)
            A[token]=A_token
            B[token]=B_token
            C[token]=C_token
            D[token]=D_token
            token_count[token]=token_presence_sum
            token_list.append(token)
#             print('token_list:',token_list)
    except Exception as e:
        pass
    return A,B,C,D,token_count,token_list

However it is not giving me the result i want. Its a 949 X 6 X different_sizes matrix


Solution

  • Here are two toy examples to show how you can parallelize a similar function.

    First Option. If you want to parallelize the whole function. You can do that using Pool.starmap(). .starmap() works like map(), but you can pass multiple arguments to it.

    from multiprocessing import Pool
    import time
    
    
    #Example 1 Simple function parallelization
    def f(a,b,c,_list):
        x = a+b+c
        time.sleep(1)
        _list.append(x)
        return _list
    
    inputs = [
        (1,2,3,['a','b','c']),
        (1,2,3,['d','e','f']),
        (1,2,3,['x','y','z']),
        (1,2,3,['A','B','C']),
    ]
    
    start = time.time()
    with Pool(4) as p:
        results = p.starmap(f, inputs)
    end = time.time()
    
    for r in results:
        print(r)
        
    print(f'done in {round(end-start, 3)} seconds')
    

    Output:

    ['a', 'b', 'c', 6]
    ['d', 'e', 'f', 6]
    ['x', 'y', 'z', 6]
    ['A', 'B', 'C', 6]
    done in 1.084 seconds
    

    Second option. If you want to parallelize only the for-loop inside the function. In that case, you should rewrite your loop as a function and call it using Pool.map() or Pool.starmap().

    #Example 2. Function calling a parallel function
    
    #loop
    def g(_string):
        time.sleep(1)
        return _string + '*'
    
    #outer function
    def f(a,b,c,_list):
        x = a+b+c
        _list.append(str(x))
        #loop parallelization
        with Pool(4) as p:
            new_list = p.map(g, _list)
        return new_list
    
    start = time.time()
    result = f(1,2,3,['a','b','c'])
    end = time.time()
    
    print(result)
    print(f'done in {round(end-start, 3)} seconds')
    

    Output:

    ['a*', 'b*', 'c*', '6*']
    done in 1.048 seconds
    

    Note that the "loop function" contains the logic to deal with a single element of the iterable. Pool.map() will take care of run it for all the elements.

    The time.sleep(1) calls are to simulate some time-consuming calculation. If the parallelization works, you should be able to process 4 inputs in 1 second rather than in 4 seconds.

    Here is an example using your code:

    def My_ParallelFunction(iterable_token, dataframe, label_array):
    
        with mp.Pool(4) as p:
            token_result = p.starmap(
                _loop,
                [(token, dataframe, label_array) for token in iterable_token]
            )
        return token_result
    
    
    def _loop(token, dataframe, label_array):
        A={}
        B={}
        C={}
        D={}
        token_count = {}
        token_list = []
        try:
            
            token_presence=dataframe['Master'].str.contains('\\b'+token+'\\b')
            token_presence_sum=sum(token_presence)
            if token_presence_sum:
                A_token, B_token, C_token, D_token = cross_tab(label_array, token_presence)
                A[token]=A_token
                B[token]=B_token
                C[token]=C_token
                D[token]=D_token
                token_count[token]=token_presence_sum
                token_list.append(token)
                return A,B,C,D,token_count,token_list
    
        except Exception as e:
            print(e)