I have problem with multiprocessing in python. In code below I call 7 workers (multiprocessing.Process) and one result threading.Thread. Before and after processing of data (extracting some metadata from files), I run:
lsof | grep ' <user> ' | grep 'python3'
And I get some open handles as:
python3 17291 ivo DEL REG 0,20 5288943 /dev/shm/ZMcs2H
python3 17291 ivo DEL REG 0,20 5288942 /dev/shm/3iMR4q
python3 17291 ivo DEL REG 0,20 5288941 /dev/shm/XPYh79
and when running multiprocessing many times in loop (processing some continuous messages) I get
OSError: [Errno 24] Too many open files
Is there something wrong with dealing with multiprocessing package?
def worker_process_results(meta_queue, res_dict):
while True:
try:
(path, meta) = meta_queue.get()
res_dict[path] = meta
finally:
meta_queue.task_done()
def multiprocess_get_metadata(paths, thread_count = 7):
""" Scan files for metadata (multiprocessing). """
file_queue = multiprocessing.JoinableQueue()
meta_queue = multiprocessing.JoinableQueue()
res_dict = dict()
# result thread
meta_thread = threading.Thread(target = lambda: worker_process_results(meta_queue, res_dict))
meta_thread.daemon = True
meta_thread.start()
workers = []
for _ in range(0, min(thread_count, len(paths))):
worker = MetaDataWorker(file_queue, meta_queue)
worker.daemon = True
worker.start()
workers.append(worker)
for path in paths:
file_queue.put(path)
file_queue.join()
meta_queue.join()
for x in workers:
x.terminate()
return res_dict
class MetaDataWorker(multiprocessing.Process):
''' Use library to get meta data from file. '''
def __init__(self, file_queue, meta_queue):
''' Constructor. '''
super().__init__()
self.file_queue = file_queue
self.meta_queue = meta_queue
def run(self):
""" Run. """
while True:
try:
path = self.file_queue.get()
meta = getmetadata(path)
meta = None
self.meta_queue.put((path, meta))
except Exception as err:
print("Thread end.")
print("{0}".format(err))
finally:
self.file_queue.task_done()
Already solved, I needed to send some ending signals to workers and result thread to stop never-ending loop