Search code examples
pythonmultiprocessingqueuepython-multiprocessinggunicorn

GUnicorn: Queue not working after re-starting worker


Problem Statement

After booting the GUnicorn worker processes, I want the worker processes still be able to receive data from another process. Currently, I'm trying to use multiprocessing.Queue to achieve this. Specifically, I start a data management process before forking the workers and use two queues to connect it with the workers. One queue is for the workers to request data from the data management process, the other to receive the data. In the post_fork hook, a worker sends out a request to the request queue and receives a response on the response queue, and only then proceeds to serving the application.

This works fine at first. However, when I manually terminate the workers and gunicorn restarts it, it will get stuck in the post_fork method and never receive a response from the data management process.

Minimal Example

The following code shows a minimal example (config.py):

import logging
import os
import multiprocessing
logging.basicConfig(level=logging.INFO)

bind = "localhost:8080"
workers = 1


def s(req_q: multiprocessing.Queue, resp_q: multiprocessing.Queue):
    while True:
        logging.info("Waiting for messages")
        other_pid = req_q.get()
        logging.info("Got a message from %d", other_pid)
        resp_q.put(os.getpid())


m = multiprocessing.Manager()
q1 = m.Queue()
q2 = m.Queue()


proc = multiprocessing.Process(target=s, args=(q1, q2), daemon=True)
proc.start()


def post_fork(server, worker):
    logging.info("Sending request")
    q1.put(os.getpid())
    logging.info("Request sent")
    other_pid = q2.get()
    logging.info("Got response from %d", other_pid)

My application module (app.py) is:

from flask import Flask
app = Flask(__name__)

And I start the server via

$ gunicorn -c config.py app:app
INFO:root:Waiting for messages
[2023-01-31 14:20:46 +0800] [24553] [INFO] Starting gunicorn 20.1.0
[2023-01-31 14:20:46 +0800] [24553] [INFO] Listening at: http://127.0.0.1:8080 (24553)
[2023-01-31 14:20:46 +0800] [24553] [INFO] Using worker: sync
[2023-01-31 14:20:46 +0800] [24580] [INFO] Booting worker with pid: 24580
INFO:root:Sending request
INFO:root:Request sent
INFO:root:Got a message from 24580
INFO:root:Waiting for messages
INFO:root:Got response from 24574

The log shows that the messages were successfully exchanged. Now, we'll stop the worker process and let gunicorn restart it:

$ kill 24580
[2023-01-31 14:22:40 +0800] [24580] [INFO] Worker exiting (pid: 24580)
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/util.py", line 319, in _exit_function
    p.join()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 122, in join
    assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process
[2023-01-31 14:22:40 +0800] [24553] [WARNING] Worker with pid 24574 was terminated due to signal 15
[2023-01-31 14:22:40 +0800] [29497] [INFO] Booting worker with pid: 29497
INFO:root:Sending request
INFO:root:Request sent

Question

Why doesn't s receive the message from the worker after re-starting?

Besides, why am I getting this 'can only join a child process' error thrown? Does it has something to do with the problem?

Environment

  • Python: 3.8.0
  • GUnicorn: 20.1.0
  • OS: Ubuntu 18.04

Related Questions

In this question, a similar problem is presented, and the solution was to use "multiprocessing.manager.queue". However, this didn't solved the issue in my case.

Side Note

I already considered the following alternative designs:

  • Use HTTP/gRPC/... to share the data: The data that I need to share isn't serializable
  • Use threading.Thread instead of multiprocessing.Process for the data management process: The data management process initializes an object that will throw an error when it is forked, so I cannot initialize this object within the GUnicorn master process.

Solution

  • Gunicorn Issue #1621 somewhat answers my question. As far as I understand this short statement, this is because Gunicorn uses os.fork and not multiprocessing, so the utilities in multiprocessing apparently aren't guaranteed to work with Gunicorn.

    So, instead of directly using multiprocessing.Queue, I replicate the behavior of Queue with another IPC library. Internally, Queue is using a ForkingPickler to serialize the data and this serialized data can also be sent via other IPC libraries, such as ZeroMQ. So, I don't necessarily need the multiprocessing module for this. Unfortunately, directly replacing the Queues with corresponding zeromq code in the above code exhibits the same behavior than in the question.

    This problem can be eliminated by putting the complete multiprocessing related code into another script, so the service process s isn't a child process of Gunicorn anymore. This leads to the following code:

    config.py:

    import logging
    import os
    import pickle
    
    import zmq
    
    logging.basicConfig(level=logging.INFO)
    
    bind = "localhost:8080"
    workers = 1
    zmq_url = "tcp://127.0.0.1:5555"
    
    
    def post_fork(server, worker):
        logging.info("Connecting")
        context = zmq.Context()
        with context.socket(zmq.REQ) as socket:
            socket.connect(zmq_url)
            logging.info("Sending request")
            socket.send(pickle.dumps(os.getpid()))
            logging.info("Waiting for a response")
            other_pid = pickle.loads(socket.recv())
        logging.info("Got response from %d", other_pid)
    

    server.py:

    import logging
    import os
    import pickle
    
    import zmq
    
    
    def serve(url):
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(url)
        while True:
            logging.info("Waiting for requests on %s", url)
            message = socket.recv()
            logging.info("Got a message from %d", pickle.loads(message))
            socket.send(pickle.dumps(os.getpid()))
    
    
    if __name__ == '__main__':
        logging.basicConfig(level=logging.INFO)
        serve("tcp://127.0.0.1:5555")
    

    The startup script looks somewhat like this:

    #!/usr/bin/env bash
    set -euo pipefail
    
    python server.py &
    gunicorn -c config.py app:app
    

    This worked reliably during my testing also for killed and restarting workers.