I have a question.. for using Pool.starmap..
p1 = pd.dataframe(example1)
p2 = pd.dataframe(example2)
pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)
tpairs = tqdm(pairs, desc='Make pair data..', total=pairs_len)
def mkpair(p1, p2, ext=False):
result = {}
if not ext:
for idx, xcol in enumerate(p1.columns):
result[f"D_{idx}"] = float(p1[xcol]) - float(p2[xcol])
return result
pool = Pool(process=4)
pool.starmap(mkpair, tpairs)
pool.close()
pool.join()
I want to get one of P1.iterrows and one of P2.iterrows in tpairs in the pool and put them as p1 and p2 arguments.
but occur "TypeError: tuple indices must be integers or slices, not str"
and i want I'm also wondering if it's possible to put it in the expression [p1, p2, ext] by adding the ext=True argument.
You have several issues with your code:
pandas.iterrows()
you are passed a tuple t
of length 2 where t[0]
is the row index and t[1]
is a pandas.Series
instance. Your worker function, mkpair
, will be passed two of these tuples, one from each dataframe. as arguments p1
and p2
. But you are calling p1.columns
where p1
is a tuple and tuples have no such attribute as columns
. So this should have raised an altogether different exception (and a pandas.Series
has no such method either). So I don't see how you are getting the exception you claim from the actual code you posted. Moreover, your statement pool = Pool(process=4)
is incorrect as the correct keyword argument is processes not process. So you could not possibly be executing this code (I will overlook the missing import statements, which I assume you actually have).What you need to do, assuming you want the progress bar to progress as tasks are completed, is to use a method such as imap_unordered
or apply_async
with a callback, which will allow you to update the bar as tasks complete. If you want to store the results in a list in task submission order rather than in completion order and you are using imap_unordered
, then you need to pass an index to the worker function that it then returns back with the result. The code is simpler if you use apply_async
but this method does not allow you to submit tasks in "chunks", which becomes an issue if the number of total tasks being submitted is very large (see the chunksize argument for method imap_unordered
). Here is how you would use each method:
Using imap_unordered
from tqdm import tqdm
from multiprocessing import Pool
import itertools
import pandas as pd
columns = ['x', 'y', 'z']
p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)
pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)
def mkpair(t: tuple) -> int:
from time import sleep
sleep(1)
# unpack tuple:
idx, rows = t
# unpack again:
row1, row2 = rows
series1 = row1[1]
series2 = row2[1]
return idx, sum(series1[column] * series2[column] for column in columns)
pool = Pool(processes=4)
results = [0] * pairs_len
with tqdm(total=pairs_len) as pbar:
for idx, total in pool.imap_unordered(mkpair, enumerate(pairs)):
results[idx] = total
pbar.update()
pool.close()
pool.join()
print(results)
Using apply_async
from tqdm import tqdm
from multiprocessing import Pool
import itertools
import pandas as pd
columns = ['x', 'y', 'z']
p1 = pd.DataFrame([[1, 2, 3], [4, 5, 6], [3, 2, 1], [6, 5, 4]], columns=columns)
p2 = pd.DataFrame([[7, 8, 9], [10, 11, 12], [9, 8, 7], [12, 11, 10]], columns=columns)
pairs = itertools.product(p1.iterrows(), p2.iterrows())
pairs_len = len(p1) * len(p2)
def mkpair(row1, row2) -> int:
from time import sleep
sleep(1)
series1 = row1[1]
series2 = row2[1]
return sum(series1[column] * series2[column] for column in columns)
def my_callback(_):
pbar.update()
pool = Pool(processes=4)
with tqdm(total=pairs_len) as pbar:
async_futures = [pool.apply_async(mkpair, args=(row1, row2), callback=my_callback)
for row1, row2 in pairs]
results = [async_future.get() for async_future in async_futures]
pool.close()
pool.join()
print(results)