I'm using Python 2.7's multiprocessing.Pool
to manage a pool of 3 workers. Each worker is fairly complicated and there's a resource leak (presumably) in some third-party code that causes problems after 6-8 hours of continuous runtime. So I'd like to use maxtasksperchild
to have workers refreshed periodically.
I'd also like each worker to write to its own separate log file. Without maxtasksperchild
I use a shared multiprocessing.Value
to assign an integer (0, 1, or 2) to each worker, then use the integer to name the log file.
With maxtasksperchild
I'd like to reuse log files once a worker is done. So if this whole thing runs for a month, I only want three log files, not one log file for each worker that was spawned.
If I could pass a callback (e.g. a finalizer
to go along with the initializer
currently supported), this would be straightforward. Without that, I can't see a robust and simple way to do it.
That's AFAIK undocumented, but multiprocessing
has a Finalizer
class, "which supports object finalization using weakrefs". You could use it to register a finalizer within your initializer
.
I don't see multiprocessing.Value
a helpful synchronization choice in this case, though. Multiple workers could exit simultaneously, signaling which file-integers are free is more than a (locked) counter could provide then.
I would suggest use of multiple bare multiprocessing.Lock
s, one for each file, instead:
from multiprocessing import Pool, Lock, current_process
from multiprocessing.util import Finalize
def f(n):
global fileno
for _ in range(int(n)): # xrange for Python 2
pass
return fileno
def init_fileno(file_locks):
for i, lock in enumerate(file_locks):
if lock.acquire(False): # non-blocking attempt
globals()['fileno'] = i
print("{} using fileno: {}".format(current_process().name, i))
Finalize(lock, lock.release, exitpriority=15)
break
if __name__ == '__main__':
n_proc = 3
file_locks = [Lock() for _ in range(n_proc)]
pool = Pool(
n_proc, initializer=init_fileno, initargs=(file_locks,),
maxtasksperchild=2
)
print(pool.map(func=f, iterable=[50e6] * 18))
pool.close()
pool.join()
# all locks should be available if all finalizers did run
assert all(lock.acquire(False) for lock in file_locks)
Output:
ForkPoolWorker-1 using fileno: 0
ForkPoolWorker-2 using fileno: 1
ForkPoolWorker-3 using fileno: 2
ForkPoolWorker-4 using fileno: 0
ForkPoolWorker-5 using fileno: 1
ForkPoolWorker-6 using fileno: 2
[0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]
Process finished with exit code 0
Note that with Python 3 you can't use Pool's context-manager reliably instead of the old way of doing it shown above. Pool's context-manager (unfortunately) calls terminate()
, which might kill worker-processes before the finalizer had a chance to run.