I'm trying to run a multiprocess task in a function that receives the dataframe and the condition and return a new dataframe.
My problem is that I receiving the "not supported instances of 'str' and 'int'
" when I call the function.
import multiprocessing
import pandas as pd
def operation(data, condition):
.
. No problem in the function since I tested it isolated
.
.
return pd.DataFrame(result_of_operation)
if __name__ == '__main__':
data = pd.read_csv(r"C:\Users\max\test.csv", index_col=False, encoding='UTF-8')
column_names = ["A", "B", "C"]
new_df = pd.DataFrame(columns = column_names)
condition = ['orange', 'banana', 'apple']
with multiprocessing.Pool(8) as p:
for x in range(0, len(condition)):
new_df.append(p.map(operation, data, condition[x]),ignore_index=True)
I believe it is a problem with my map
operation since it works if I call the function by itself, like:
for x in range(0, len(condition)):
new_df.append(operation(data, condition[x]),ignore_index=True)
It looks like the way you're passing parameters to map() is wrong.
Try this:
from functools import partial
with multiprocessing.Pool() as p:
new_df = pd.concat(p.map(partial(operation, data), condition), ignore_index=True)
Perhaps to make things even clearer, here's a complete working example. Note that append() is being deprecated so I use the preferred concat() here:
from pandas import DataFrame, concat
from concurrent.futures import ProcessPoolExecutor
from functools import partial
def process(columns, data):
return DataFrame(data, columns=columns)
def main():
data = [[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
with ProcessPoolExecutor(max_workers=3) as p:
newdf = concat(p.map(partial(process, ['A']), data), ignore_index=True)
print(newdf)
if __name__ == '__main__':
main()
Output:
A
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9