TL;DR: Why is there no handler listed for the logger at the (inside run) print statement in the console view?
Looking for explanation of why this logging scheme is not working properly.
I'm following the recipe (pretty closely) for logging multiple processes to the same log file found here in the python dox. The main difference in the code below is that I'm attempting to implement a Worker class instead of just a function. Perhaps I can switch back to function, but it would fit better in the larger scheme of the project as a class. Anyhow...
When following the basic guidance of the dox, I can get the logging listener function up and running fine, but things fall apart when the worker
tries to log. I can see that the queue handler is added to the worker logger during the configuration function, called during the __init__
and the logging statement in the init is captured in the log as expected.
However, inside the run()
function, the wheels fall off and it is as if the logger that was created has forgotten about its handlers.
I'm guessing there is some nuance to how the multiprocessing
processes are initiated that cause this, but it took some time to T/S this and I'd be interested in an explanation as to why this doesn't work or a better way to configure a logger for a worker Process
that is a class.
import logging
import random
import sys
import time
from logging import handlers
from multiprocessing import Queue, Process
from queue import Empty, Full
def worker_configurer(log_queue, idx):
logger = logging.getLogger(".".join(("A", "worker", str(idx))))
h = handlers.QueueHandler(log_queue)
logger.addHandler(h)
logger.setLevel(logging.INFO)
print(
f"configured worker {idx} with logger {logger.name} with handlers: {logger.handlers.copy()}"
)
return logger
class Worker(Process):
worker_idx = 0
def __init__(self, work_queue, log_queue, worker_configurer, **kwargs):
super(Worker, self).__init__()
self.idx = Worker.worker_idx
Worker.worker_idx += 1
self.logger = worker_configurer(log_queue, self.idx)
print(f"self.logger handlers during init: {self.logger.handlers.copy()}")
self.logger.info(f"worker {self.idx} initialized") # <-- does show up in log
self.work_queue = work_queue
def run(self):
print(
f"(inside run): self.logger name: {self.logger.name}, handlers:"
f" {self.logger.handlers.copy()}"
)
self.logger.info(f"worker {self.idx} started!") # <-- will NOT show up in log
while True:
job_duration = self.work_queue.get()
if job_duration is None:
print(f"Worker {self.idx} received stop signal")
break
time.sleep(job_duration)
# book the job...
print(f"worker {self.idx} finished job of length {job_duration}")
self.logger.info(f"worker {self.idx} finished job of length {job_duration}")
def listener_configurer():
logging.basicConfig(
filename="mp_log.log",
filemode="a",
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
level=logging.INFO,
)
def listener_process(queue, configurer):
configurer() # redundant (for now), but harmless
logger = logging.getLogger("A")
while True:
try:
record = queue.get(timeout=5)
print("bagged a message from the queue")
if (
record is None
): # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Empty:
pass
except Exception:
import sys, traceback
print("Whoops! Problem:", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
if __name__ == "__main__":
listener_configurer()
logger = logging.getLogger("A")
logger.warning("Logger Active!")
work_queue = Queue(5)
log_queue = Queue(100)
# start the logging listener
listener = Process(target=listener_process, args=(log_queue, listener_configurer))
listener.start()
# make workers
num_workers = 2
workers = []
for i in range(num_workers):
w = Worker(
work_queue,
log_queue=log_queue,
worker_configurer=worker_configurer,
)
w.start()
workers.append(w)
logger.info(f"worker {i} created")
num_jobs = 10
jobs_assigned = 0
while jobs_assigned < num_jobs:
try:
work_queue.put(random.random() * 2, timeout=0.1)
jobs_assigned += 1
except Full:
pass
print("Call it a day and send stop sentinel to everybody")
for i in range(num_workers):
work_queue.put(None)
log_queue.put(None)
for w in workers:
w.join()
print("another worker retired!")
listener.join()
configured worker 0 with logger A.worker.0 with handlers: [<QueueHandler (NOTSET)>]
self.logger handlers during init: [<QueueHandler (NOTSET)>]
configured worker 1 with logger A.worker.1 with handlers: [<QueueHandler (NOTSET)>]
self.logger handlers during init: [<QueueHandler (NOTSET)>]
bagged a message from the queue
bagged a message from the queue
(inside run): self.logger name: A.worker.1, handlers: []
(inside run): self.logger name: A.worker.0, handlers: []
worker 0 finished job of length 1.2150712953970373
worker 1 finished job of length 1.2574239731920005
worker 1 finished job of length 0.11736058130132943
Call it a day and send stop sentinel to everybody
worker 0 finished job of length 0.4843796181316009
worker 1 finished job of length 1.048915894468737
bagged a message from the queue
worker 0 finished job of length 1.2749454212499574
worker 0 finished job of length 0.7298640313585205
worker 1 finished job of length 1.6144333153092076
worker 1 finished job of length 1.219077068714904
Worker 1 received stop signal
worker 0 finished job of length 1.561689295025705
Worker 0 received stop signal
another worker retired!
another worker retired!
08-May-24 17:33:15 | A | WARNING | Logger Active!
08-May-24 17:33:15 | A.worker.0 | INFO | worker 0 initialized
08-May-24 17:33:15 | A | INFO | worker 0 created
08-May-24 17:33:15 | A.worker.1 | INFO | worker 1 initialized
08-May-24 17:33:15 | A | INFO | worker 1 created
08-May-24 17:33:15 | A.worker.0 | INFO | worker 0 initialized
08-May-24 17:33:15 | A.worker.1 | INFO | worker 1 initialized
the first log action in the __init__ is successful in the new Process
No, you see it at main process, not the new process.
You can print {self.pid} inside __init__
to prove it, the fact is that when only __init__
is executed, the pid=None.
New process is created after call start(), rather than when creating instance.
Your code init logger handler inside main process, not the new process. You should init logger inside new worker or set start method.
multiprocessing.set_start_method('fork')
spawn
... The child process will only inherit those resources necessary to run the process object’s run() method.
In particular, unnecessary file descriptors and handles from the parent process will not be inherited.
fork
... All resources of the parent are inherited by the child process.