When trying to use the multiprocessing library (python 3.5) I'm confronted with functions that do not end. All appears to be processed but the (main) program does not continue...
My current setup is as follows:
# Main.py
import multiprocessing as mp
import pandas as pd
from dosomething import dosomething
csvfolder = 'data/'
data = pd.DataFrame([
{'a':12,'b':13},
{'a':2,'b':14},
{'a':1,'b':23},
{'a':123,'b':16},
{'a':142,'b':14},
])
print('start')
result = mp.Queue()
dos = mp.Process(target=dosomething, args=(data,csvfolder,result,'dosomething'))
dos.start()
dos.join()
result.get()
print('finished')
And then in the dosomething I have defined a function dosomething that does following:
# dosomething.py
import os
def dosomething(data,csvfolder,result,name):
data.to_csv(os.path.join(csvfolder,'test.csv'))
result.put({name:{'data':data}})
It appears that the function is executed as expected but never ends causing the main program to stall. When ending the program I get following message:
Process Process-1: Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap util._exit_function() File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function _run_finalizers() File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers finalizer() File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in call res = self._callback(*self._args, **self._kwargs) File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join thread.join() File "/usr/lib/python3.5/threading.py", line 1054, in join self._wait_for_tstate_lock() File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock elif lock.acquire(block, timeout): KeyboardInterrupt
Based on the comments I learned that the result.put() is (when using the actual data) a lot of time, becomes inresponsive. The result I 'm placing on this queue is a dictionary with 2 elements of which one is a pandas dataframe (of several 100.000 records).
How can I solve this?
From multiprocessing guidelines
.
... whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined.
Just swap the Process.join
and Queue.get
lines to get it working.