Search code examples
pythonpython-3.xasynchronousmultiprocessingdaemon

Gracefully stopping python daemon with child proceses


I'm trying to implement a python daemon in the traditional start/stop/restart style to control a consumer to a messaging queue. I've successfully used python-daemons to create a single consumer, but I need more than one listener for the volume of messages. This led me to implement the multiprocessing library in my run function along with a os.kill call for the stop function:

def run(self):
    for num in range(self.num_instances):
        p = multiprocessing.Process(target=self.start_listening)
        p.start()

def start_listening(self):
    with open('/tmp/pids/listener_{}.pid'.format(os.getpid()), 'w') as f:
         f.write("{}".format(os.getpid()))
    while True:
        // implement message queue listener

def stop(self):         
    for pid in os.listdir('/tmp/pids/'):
        os.kill(int(os.path.basename(pid)), signal.SIGTERM)
    shutil.rmtree('/tmp/pids/')
    super().stop()

This is almost ok, but I'd really like to have a graceful shutdown of the child processes and do some clean up which would include logging. I read about signal handlers so I switched the signal.SIGTERM to signal.SIGINT and added a handler to the daemon class.

def __init__(self):
   ....
   signal.signal(signal.SIGINT, self.graceful_stop)

def stop(self):         
    for pid in os.listdir('/tmp/pids/'):
        os.kill(int(os.path.basename(pid)), signal.SIGINT)
    super().stop()

def graceful_stop(self):
    self.log.deug("Gracefully stopping the child {}".format(os.getpid()))
    os.rm('/tmp/pids/listener_{}.pid".format(os.getpid()))
    ...

However, when tested, the child processes get killed but it doesn't seem like the graceful_stop function never gets called (files remain, logging doesn't get logged, etc). Am I implementing the handler wrong for the child processes? Is there a better way of having multiple listeners with a single control point?


Solution

  • I figured it out. The signal.signal declaration had to be explicitly put in each sub process's start_listening function.

    def start_listening(self):
        signal.signal(signal.SIGINT, self.graceful_stop)
        with open('/tmp/pids/listener_{}.pid'.format(os.getpid()), 'w') as f:
            f.write("{}".format(os.getpid()))
        while True:
            // implement message queue listener