I'm starting to include multiprocessing into my code, as the task I'm trying to automate is rather costly computationally speaking. The module structure in the code I have is the following, based on the stackoverflow information I've been gathering. I'm working on python 3.7 in Win10.
main: where the multiprocessing function is called, besides loading inputs and such.
import pandas as pd
import run
def do():
df=pd.DataFrame({'Identifier': ['id_1', 'id_1', 'id_1', 'id_1', 'id_1', 'id_2', 'id_2', 'id_2', 'id_2', 'id_2', 'id_3', 'id_3', 'id_3', 'id_3', 'id_3'],
'float_id': [1, 2, 3, 4, 5, 10, 25, 33, 45, 50, .1, .2, .3, .4, .5],
'a': np.random.rand(15),
'b': np.random.rand(15),
'c': np.random.rand(15)})
v_column=['a', 'b', 'c']
df_out=run.function_multiprocessing(df, v_column)
return df_out
if __name__=='__main__':
df_out=do()
import defs
import pandas as pd
import multiprocessing
def iterator(data, id_col, value_col):
for col in value_col:
yield (data[col].values, data[id_col].values)
def function_multiprocessing(data, v_column):
list_df=[]
with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
for identifier, df_f in data.groupby(['Identifier']):
print(identifier)
data_f=pool.starmap(defs.function_to_apply, iterator(df_f, 'float_id', v_column))
out=pd.DataFrame(data_f, index=[identifier])
list_df.append(out)
df_out=pd.concat(list_df)
return df_out
None of these modules belong to my PYHTONPATH, and are located in the same folder. What I'm trying to do is apply a function over each "Identifier", based on the values found in "float_id" and the values of columns "a", "b" and "c" respectively. We may think of a weighted average of the column values based on the "float_id" values for the sake of simplicity.
When I execute the code, I get the following error -no matter what I try-, over and over again for each worker.
Process SpawnPoolWorker-1:
Traceback (most recent call last):
File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 297, in _bootstrap
self.run()
File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\pool.py", line 110, in worker
task = get()
File "C:\Users\xxxx\AppData\Local\Continuum\anaconda3\lib\multiprocessing\queues.py", line 354, in get
return _ForkingPickler.loads(res)
ModuleNotFoundError: No module named 'defs'
I've tried including the line multiprocessing.set_start_method("fork")
after importing the multiprocessing module, which raised an error. The same for "spawn"
. I also tried including the module "defs" as a parameter in the function_multiprocessing
method, and calling it from main
, with no success. The same error occurs when providing "function_to_apply" as a parameter.
¿What am I doing wrong?¿How could I make this work?
Thanks a lot beforehand!
UPDATE: When importing the defs.py
module within the function_multiprocessing
method as such
def function_multiprocessing(data, v_column):
import defs
list_df=[]
...
it does not raise any errors. However, when providing the module as a variable it did not work.
Had a similar error. A workaround is to call subprocess.run
directly in map
(which is similar to starmap
, but here we can't use that, as it would unpack arguments for subprocess.run
instead of "main_script" below), with your executable logic in a python file - as opposed to a function:
pool.map(subprocess.run, [sys.executable, main_script, *args])
Where
if __name__ == '__main__':
will be executed. You can put your logic from your function_to_apply
there.