Search code examples
pythonpython-3.xpython-multiprocessingpython-multithreading

Python multiprocessing and too many open files


I have problem with multiprocessing in python. In code below I call 7 workers (multiprocessing.Process) and one result threading.Thread. Before and after processing of data (extracting some metadata from files), I run:

lsof | grep ' <user> ' | grep 'python3'

And I get some open handles as:

python3   17291              ivo  DEL       REG               0,20             5288943 /dev/shm/ZMcs2H
python3   17291              ivo  DEL       REG               0,20             5288942 /dev/shm/3iMR4q
python3   17291              ivo  DEL       REG               0,20             5288941 /dev/shm/XPYh79

and when running multiprocessing many times in loop (processing some continuous messages) I get

OSError: [Errno 24] Too many open files

Is there something wrong with dealing with multiprocessing package?

def worker_process_results(meta_queue, res_dict):
    while True:
        try:
            (path, meta) = meta_queue.get()
            res_dict[path] = meta
        finally:
            meta_queue.task_done()

def multiprocess_get_metadata(paths, thread_count = 7):
    """ Scan files for metadata (multiprocessing). """
    file_queue = multiprocessing.JoinableQueue()
    meta_queue = multiprocessing.JoinableQueue()

    res_dict   = dict()
    # result thread    
    meta_thread = threading.Thread(target = lambda: worker_process_results(meta_queue, res_dict))
    meta_thread.daemon = True
    meta_thread.start()

    workers = []

    for _ in range(0, min(thread_count, len(paths))):
        worker = MetaDataWorker(file_queue, meta_queue)
        worker.daemon = True
        worker.start()        
        workers.append(worker)

    for path in paths:
        file_queue.put(path)

    file_queue.join()
    meta_queue.join()

    for x in workers:
        x.terminate()

    return res_dict

class MetaDataWorker(multiprocessing.Process):
    ''' Use library to get meta data from file. '''

    def __init__(self, file_queue, meta_queue):
        ''' Constructor. '''
        super().__init__()

        self.file_queue = file_queue
        self.meta_queue = meta_queue

    def run(self):
        """ Run. """

        while True:
            try:
                path = self.file_queue.get()
                meta = getmetadata(path)
                meta = None
                self.meta_queue.put((path, meta))
            except Exception as err:
                print("Thread end.")
                print("{0}".format(err))
            finally:
                self.file_queue.task_done()

Solution

  • Already solved, I needed to send some ending signals to workers and result thread to stop never-ending loop