Search code examples
pythondataframemultiprocessingpython-itertoolspool

How to use multiprocessing pool.starmap with multiple arguments


I have a question.. for using Pool.starmap..

p1 = pd.dataframe(example1)
p2 = pd.dataframe(example2)

pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)

tpairs = tqdm(pairs, desc='Make pair data..', total=pairs_len)

def mkpair(p1, p2, ext=False):
    result = {}
    if not ext:
        for idx, xcol in enumerate(p1.columns):
            result[f"D_{idx}"] = float(p1[xcol]) - float(p2[xcol])
    return result

pool = Pool(process=4)
pool.starmap(mkpair, tpairs)
pool.close()
pool.join()

I want to get one of P1.iterrows and one of P2.iterrows in tpairs in the pool and put them as p1 and p2 arguments.

but occur "TypeError: tuple indices must be integers or slices, not str"

and i want I'm also wondering if it's possible to put it in the expression [p1, p2, ext] by adding the ext=True argument.


Solution

  • You have several issues with your code:

    1. When you iterate the iterator returned by calling method pandas.iterrows() you are passed a tuple t of length 2 where t[0] is the row index and t[1] is a pandas.Series instance. Your worker function, mkpair, will be passed two of these tuples, one from each dataframe. as arguments p1 and p2. But you are calling p1.columns where p1 is a tuple and tuples have no such attribute as columns. So this should have raised an altogether different exception (and a pandas.Series has no such method either). So I don't see how you are getting the exception you claim from the actual code you posted. Moreover, your statement pool = Pool(process=4) is incorrect as the correct keyword argument is processes not process. So you could not possibly be executing this code (I will overlook the missing import statements, which I assume you actually have).
    2. You are creating a progress bar that will be iterated upon submission of the tasks to the multiprocessing pool. So even if your work function took a second to complete (and was coded correctly), the progress bar would practically instantaneously shoot up to 100% as the tasks are submitted. So the only progress you are measuring is task submission, not completion.

    What you need to do, assuming you want the progress bar to progress as tasks are completed, is to use a method such as imap_unordered or apply_async with a callback, which will allow you to update the bar as tasks complete. If you want to store the results in a list in task submission order rather than in completion order and you are using imap_unordered, then you need to pass an index to the worker function that it then returns back with the result. The code is simpler if you use apply_async but this method does not allow you to submit tasks in "chunks", which becomes an issue if the number of total tasks being submitted is very large (see the chunksize argument for method imap_unordered). Here is how you would use each method:

    Using imap_unordered

    from tqdm import tqdm
    from multiprocessing import Pool
    import itertools
    import pandas as pd
    
    columns = ['x', 'y', 'z']
    
    p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
    p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)
    
    pairs = itertools.product(p1.iterrows(), p2.iterrows())
    pairs_len = len(p1) * len(p2)
    
    def mkpair(t: tuple) -> int:
        from time import sleep
    
        sleep(1)
        # unpack tuple:
        idx, rows = t
        # unpack again:
        row1, row2 = rows
        series1 = row1[1]
        series2 = row2[1]
        return idx, sum(series1[column] * series2[column] for column in columns)
    
    pool = Pool(processes=4)
    results = [0] * pairs_len
    with tqdm(total=pairs_len) as pbar:
        for idx, total in pool.imap_unordered(mkpair, enumerate(pairs)):
            results[idx] = total
            pbar.update()
    pool.close()
    pool.join()
    print(results)
    

    Using apply_async

    from tqdm import tqdm
    from multiprocessing import Pool
    import itertools
    import pandas as pd
    
    columns = ['x', 'y', 'z']
    
    p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
    p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)
    
    pairs = itertools.product(p1.iterrows(), p2.iterrows())
    pairs_len = len(p1) * len(p2)
    
    def mkpair(row1, row2) -> int:
        from time import sleep
    
        sleep(1)
        series1 = row1[1]
        series2 = row2[1]
        return sum(series1[column] * series2[column] for column in columns)
    
    def my_callback(_):
        pbar.update()
    
    pool = Pool(processes=4)
    with tqdm(total=pairs_len) as pbar:
        async_futures = [pool.apply_async(mkpair, args=(row1, row2), callback=my_callback)
                         for row1, row2 in pairs]
        results = [async_future.get() for async_future in async_futures]
    pool.close()
    pool.join()
    print(results)