Search code examples
pythonmultiprocessingpython-multiprocessingprocess-pool

python multiprocessing pool notification on worker refreshing


I'm using Python 2.7's multiprocessing.Pool to manage a pool of 3 workers. Each worker is fairly complicated and there's a resource leak (presumably) in some third-party code that causes problems after 6-8 hours of continuous runtime. So I'd like to use maxtasksperchild to have workers refreshed periodically.

I'd also like each worker to write to its own separate log file. Without maxtasksperchild I use a shared multiprocessing.Value to assign an integer (0, 1, or 2) to each worker, then use the integer to name the log file.

With maxtasksperchild I'd like to reuse log files once a worker is done. So if this whole thing runs for a month, I only want three log files, not one log file for each worker that was spawned.

If I could pass a callback (e.g. a finalizer to go along with the initializer currently supported), this would be straightforward. Without that, I can't see a robust and simple way to do it.


Solution

  • That's AFAIK undocumented, but multiprocessing has a Finalizer class, "which supports object finalization using weakrefs". You could use it to register a finalizer within your initializer.

    I don't see multiprocessing.Value a helpful synchronization choice in this case, though. Multiple workers could exit simultaneously, signaling which file-integers are free is more than a (locked) counter could provide then.

    I would suggest use of multiple bare multiprocessing.Locks, one for each file, instead:

    from multiprocessing import Pool, Lock, current_process
    from multiprocessing.util import Finalize
    
    
    def f(n):
        global fileno
        for _ in range(int(n)):  # xrange for Python 2
            pass
        return fileno
    
    
    def init_fileno(file_locks):
        for i, lock in enumerate(file_locks):
            if lock.acquire(False):  # non-blocking attempt
                globals()['fileno'] = i
                print("{} using fileno: {}".format(current_process().name, i))
                Finalize(lock, lock.release, exitpriority=15)
                break
    
    
    if __name__ == '__main__':
    
        n_proc = 3
        file_locks = [Lock() for _ in range(n_proc)]
    
        pool = Pool(
            n_proc, initializer=init_fileno, initargs=(file_locks,),
            maxtasksperchild=2
        )
    
        print(pool.map(func=f, iterable=[50e6] * 18))
        pool.close()
        pool.join()
        # all locks should be available if all finalizers did run
        assert all(lock.acquire(False) for lock in file_locks)
    

    Output:

    ForkPoolWorker-1 using fileno: 0
    ForkPoolWorker-2 using fileno: 1
    ForkPoolWorker-3 using fileno: 2
    ForkPoolWorker-4 using fileno: 0
    ForkPoolWorker-5 using fileno: 1
    ForkPoolWorker-6 using fileno: 2
    [0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]
    
    Process finished with exit code 0
    

    Note that with Python 3 you can't use Pool's context-manager reliably instead of the old way of doing it shown above. Pool's context-manager (unfortunately) calls terminate(), which might kill worker-processes before the finalizer had a chance to run.