Search code examples
pythonmultiprocessingpoolargskeyword-argument

Multiprocessing.pool with a function that has multiple args and kwargs


I would like to parallelise a calculation using the mutliprocessing.pool method. The problem is that the function I would like to use in the calculation presents two args and optional kwargs, being the first argument a dataframe, the second one a str and any kwargs a dictionary.

Both the dataframe and the dictionary I want to use are the same for all the calculations I am trying to carry out, being only the second arg the one that keeps changing. I was therefore hoping to be able to pass it as a list of different strings using the map method to the already packed function with the df and dict.

from utils import *
import multiprocessing
from functools import partial



def sumifs(df, result_col, **kwargs):

    compare_cols = list(kwargs.keys())
    operators = {}
    for col in compare_cols:
        if type(kwargs[col]) == tuple:
            operators[col] = kwargs[col][0]
            kwargs[col] = list(kwargs[col][1])
        else:
            operators[col] = operator.eq
            kwargs[col] = list(kwargs[col])
    result = []
    cache = {}
    # Go through each value
    for i in range(len(kwargs[compare_cols[0]])):
        compare_values = [kwargs[col][i] for col in compare_cols]
        cache_key = ','.join([str(s) for s in compare_values])
        if (cache_key in cache):
            entry = cache[cache_key]
        else:
            df_copy = df.copy()
            for compare_col, compare_value in zip(compare_cols, compare_values):
                df_copy = df_copy.loc[operators[compare_col](df_copy[compare_col], compare_value)]
            entry = df_copy[result_col].sum()
            cache[cache_key] = entry
        result.append(entry)
    return pd.Series(result)

if __name__ == '__main__':

    ca = read_in_table('Tab1')
    total_consumer_ids = len(ca)

    base = pd.DataFrame()
    base['ID'] = range(1, total_consumer_ids + 1)


    result_col= ['A', 'B', 'C']
    keywords = {'Z': base['Consumer archetype ID']}

    max_number_processes = multiprocessing.cpu_count()
    with multiprocessing.Pool(processes=max_number_processes) as pool:
        results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)
    print(results)

However, when I run the code above I get the following error: TypeError: sumifs() missing 1 required positional argument: 'result_col'. How could I provide the function with the first arg and kwargs, while providing the second argument as a list of str so I can paralelise the calculation? I have read several similar questions in the forum but none of the solutions seem to work for this case...

Thank you and apologies if something is not clear, I just learnt of the multiprocessing package today!


Solution

  • Let's have a look at two part of your code.

    First the sumifs function declaration:

    def sumifs(df, result_col, **kwargs):

    Secondly, the call to this function with the relevant parameters.

    # Those are the params
    ca = read_in_table('Tab1')
    keywords = {'Z': base['Consumer archetype ID']}
    
    # This is the function call
    results = pool.map(partial(sumifs, a=ca, kwargs=keywords), tasks)
    

    Update 1:

    After the original code has been edited.It look like the problem is the positional argument assignment, try to discard it.

    replace the line:

    results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)
    

    with:

    results = pool.map(partial(sumifs, ca, **keywords), result_col)
    

    An example code:

    import multiprocessing
    from functools import partial
    
    def test_func(arg1, arg2, **kwargs):
        print(arg1)
        print(arg2)
        print(kwargs)
        return arg2
    
    if __name__ == '__main__':
        list_of_args2 = [1, 2, 3]
        just_a_dict = {'key1': 'Some value'}
        with multiprocessing.Pool(processes=3) as pool:
            results = pool.map(partial(test_func, 'This is arg1', **just_a_dict), list_of_args2)
        print(results)
    

    Will output:

    This is arg1
    1
    {'key1': 'Some value'}
    This is arg1
    2
    {'key1': 'Some value'}
    This is arg1
    2
    {'key1': 'Some value'}
    ['1', '2', '3']
    

    More example for how to Multiprocessing.pool with a function that has multiple args and kwargs


    Update 2:

    Extended example (due to comments):

    I wonder however, in the same fashion, if my function had three args and kwargs, and I wanted to keep arg1, arg3 and kwargs costant, how could I pass arg2 as a list for multiprocessing? In essence, how will I inidicate multiprocessing that map(partial(test_func, 'This is arg1', 'This would be arg3', **just_a_dict), arg2) the second value in partial corresponds to arg3 and not arg2?

    The Update 1 code would have change as follow:

    # The function signature
    def test_func(arg1, arg2, arg3, **kwargs):
    
    # The map call
    pool.map(partial(test_func, 'This is arg1', arg3='This is arg3', **just_a_dict), list_of_args2)
    

    This can be done using the python positional and keyword assignment. Note that the kwargs is left aside and not assigned using a keyword despite the fact that it's located after a keyword assigned value.

    More information about argument assignment differences can be found here.