Search code examples
pythonloggingpython-multiprocessing

Logging to Queue during multiprocessing fails


TL;DR: Why is there no handler listed for the logger at the (inside run) print statement in the console view?

Looking for explanation of why this logging scheme is not working properly.

I'm following the recipe (pretty closely) for logging multiple processes to the same log file found here in the python dox. The main difference in the code below is that I'm attempting to implement a Worker class instead of just a function. Perhaps I can switch back to function, but it would fit better in the larger scheme of the project as a class. Anyhow...

When following the basic guidance of the dox, I can get the logging listener function up and running fine, but things fall apart when the worker tries to log. I can see that the queue handler is added to the worker logger during the configuration function, called during the __init__ and the logging statement in the init is captured in the log as expected.

However, inside the run() function, the wheels fall off and it is as if the logger that was created has forgotten about its handlers.

I'm guessing there is some nuance to how the multiprocessing processes are initiated that cause this, but it took some time to T/S this and I'd be interested in an explanation as to why this doesn't work or a better way to configure a logger for a worker Process that is a class.

Code:

import logging
import random
import sys
import time
from logging import handlers
from multiprocessing import Queue, Process
from queue import Empty, Full


def worker_configurer(log_queue, idx):
    logger = logging.getLogger(".".join(("A", "worker", str(idx))))
    h = handlers.QueueHandler(log_queue)
    logger.addHandler(h)
    logger.setLevel(logging.INFO)
    print(
        f"configured worker {idx} with logger {logger.name} with handlers: {logger.handlers.copy()}"
    )
    return logger


class Worker(Process):
    worker_idx = 0

    def __init__(self, work_queue, log_queue, worker_configurer, **kwargs):
        super(Worker, self).__init__()
        self.idx = Worker.worker_idx
        Worker.worker_idx += 1
        self.logger = worker_configurer(log_queue, self.idx)
        print(f"self.logger handlers during init: {self.logger.handlers.copy()}")
        self.logger.info(f"worker {self.idx} initialized")  # <-- does show up in log
        self.work_queue = work_queue

    def run(self):
        print(
            f"(inside run): self.logger name: {self.logger.name}, handlers:"
            f" {self.logger.handlers.copy()}"
        )

        self.logger.info(f"worker {self.idx} started!")  # <-- will NOT show up in log
        while True:
            job_duration = self.work_queue.get()
            if job_duration is None:
                print(f"Worker {self.idx} received stop signal")
                break
            time.sleep(job_duration)
            # book the job...
            print(f"worker {self.idx} finished job of length {job_duration}")
            self.logger.info(f"worker {self.idx} finished job of length {job_duration}")


def listener_configurer():
    logging.basicConfig(
        filename="mp_log.log",
        filemode="a",
        format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
        datefmt="%d-%b-%y %H:%M:%S",
        level=logging.INFO,
    )


def listener_process(queue, configurer):
    configurer()  # redundant (for now), but harmless
    logger = logging.getLogger("A")
    while True:
        try:
            record = queue.get(timeout=5)
            print("bagged a message from the queue")
            if (
                record is None
            ):  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Empty:
            pass
        except Exception:
            import sys, traceback

            print("Whoops! Problem:", file=sys.stderr)
            traceback.print_exc(file=sys.stderr)


if __name__ == "__main__":
    listener_configurer()
    logger = logging.getLogger("A")
    logger.warning("Logger Active!")
    work_queue = Queue(5)
    log_queue = Queue(100)

    # start the logging listener
    listener = Process(target=listener_process, args=(log_queue, listener_configurer))
    listener.start()
    # make workers
    num_workers = 2
    workers = []
    for i in range(num_workers):
        w = Worker(
            work_queue,
            log_queue=log_queue,
            worker_configurer=worker_configurer,
        )
        w.start()
        workers.append(w)
        logger.info(f"worker {i} created")

    num_jobs = 10
    jobs_assigned = 0
    while jobs_assigned < num_jobs:
        try:
            work_queue.put(random.random() * 2, timeout=0.1)
            jobs_assigned += 1
        except Full:
            pass

    print("Call it a day and send stop sentinel to everybody")
    for i in range(num_workers):
        work_queue.put(None)
    log_queue.put(None)

    for w in workers:
        w.join()
        print("another worker retired!")

    listener.join()

Console:

configured worker 0 with logger A.worker.0 with handlers: [<QueueHandler (NOTSET)>]
self.logger handlers during init: [<QueueHandler (NOTSET)>]
configured worker 1 with logger A.worker.1 with handlers: [<QueueHandler (NOTSET)>]
self.logger handlers during init: [<QueueHandler (NOTSET)>]
bagged a message from the queue
bagged a message from the queue
(inside run): self.logger name: A.worker.1, handlers: []
(inside run): self.logger name: A.worker.0, handlers: []
worker 0 finished job of length 1.2150712953970373
worker 1 finished job of length 1.2574239731920005
worker 1 finished job of length 0.11736058130132943
Call it a day and send stop sentinel to everybody
worker 0 finished job of length 0.4843796181316009
worker 1 finished job of length 1.048915894468737
bagged a message from the queue
worker 0 finished job of length 1.2749454212499574
worker 0 finished job of length 0.7298640313585205
worker 1 finished job of length 1.6144333153092076
worker 1 finished job of length 1.219077068714904
Worker 1 received stop signal
worker 0 finished job of length 1.561689295025705
Worker 0 received stop signal
another worker retired!
another worker retired!

Log File:

08-May-24 17:33:15 | A | WARNING | Logger Active!
08-May-24 17:33:15 | A.worker.0 | INFO | worker 0 initialized
08-May-24 17:33:15 | A | INFO | worker 0 created
08-May-24 17:33:15 | A.worker.1 | INFO | worker 1 initialized
08-May-24 17:33:15 | A | INFO | worker 1 created
08-May-24 17:33:15 | A.worker.0 | INFO | worker 0 initialized
08-May-24 17:33:15 | A.worker.1 | INFO | worker 1 initialized

Solution

  • fist

    the first log action in the __init__ is successful in the new Process

    No, you see it at main process, not the new process. You can print {self.pid} inside __init__ to prove it, the fact is that when only __init__ is executed, the pid=None.

    New process is created after call start(), rather than when creating instance.

    second

    Your code init logger handler inside main process, not the new process. You should init logger inside new worker or set start method.

    multiprocessing.set_start_method('fork')
    
    spawn
      ... The child process will only inherit those resources necessary to run the process object’s run() method. 
      In particular, unnecessary file descriptors and handles from the parent process will not be inherited.
    
    fork
      ... All resources of the parent are inherited by the child process. 
    

    docs.python.org: multiprocessing start methods