I am using multiprocessing.Pool
to run a number of independent tasks in parallel. Not so much different from the basic example in the python docs:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
I would like each item to log to a separate file. I log various info from other modules in my codebase and some third-party packages (none of them is multiprocessing aware). So, for example, I would like this:
import logging
from multiprocessing import Pool
def logOnDisk(x):
logging.info("hello world")
if __name__ == '__main__':
with Pool() as p:
p.map(logOnDisk, ["ping", "pong", "foo", "bar"])
to write on disk:
ping.log
pong.log
foo.log
bar.log
Each file should contain the string "hello world". How do I achieve this?
To use a separate logfile for every processed item (of the iterable you pass to pool.map()
) from inside the workers, nothing multiprocessing-specific is really needed. But you can
can use Pool's initializer()
to set up the root-loggers inside the workers and store some meta-data for logging. Under the hood the arguments to Pool(initializer)
and Pool(initargs)
end up being passed to Process(target)
and Process(args)
for creating new worker-processes.
Then you only have to exchange the Filehandler
for every item processed with log_on_disk()
inside the workers.
import logging
import multiprocessing as mp
def log_on_disk(x):
logger = _init_logger(file_id=x)
logger.info(f"hello world")
def _init_logging(level=logging.INFO, mode='a'):
fmt = logging.Formatter(
'%(asctime)s %(processName)-10s %(name)s %(levelname)-8s --- %(message)s'
)
logger = logging.getLogger()
logger.setLevel(level)
globals()['_log_meta'] = {'mode': mode, 'fmt': fmt}
def _init_logger(file_id):
logger = logging.getLogger()
logger.handlers = [] # remove all handlers
fh = logging.FileHandler(f"{file_id}.log", mode=_log_meta['mode'])
fh.setFormatter(_log_meta['fmt'])
logger.addHandler(fh)
return logger
if __name__ == '__main__':
with mp.Pool(5, initializer=_init_logging, initargs=(logging.DEBUG,)) as pool:
pool.map(log_on_disk, ["ping", "pong", "foo", "bar"])