After booting the GUnicorn worker processes, I want the worker processes still be able to receive data from another process. Currently, I'm trying to use multiprocessing.Queue
to achieve this. Specifically, I start a data management process before forking the workers and use two queues to connect it with the workers. One queue is for the workers to request data from the data management process, the other to receive the data. In the post_fork
hook, a worker sends out a request to the request queue and receives a response on the response queue, and only then proceeds to serving the application.
This works fine at first. However, when I manually terminate the workers and gunicorn restarts it, it will get stuck in the post_fork
method and never receive a response from the data management process.
The following code shows a minimal example (config.py
):
import logging
import os
import multiprocessing
logging.basicConfig(level=logging.INFO)
bind = "localhost:8080"
workers = 1
def s(req_q: multiprocessing.Queue, resp_q: multiprocessing.Queue):
while True:
logging.info("Waiting for messages")
other_pid = req_q.get()
logging.info("Got a message from %d", other_pid)
resp_q.put(os.getpid())
m = multiprocessing.Manager()
q1 = m.Queue()
q2 = m.Queue()
proc = multiprocessing.Process(target=s, args=(q1, q2), daemon=True)
proc.start()
def post_fork(server, worker):
logging.info("Sending request")
q1.put(os.getpid())
logging.info("Request sent")
other_pid = q2.get()
logging.info("Got response from %d", other_pid)
My application module (app.py
) is:
from flask import Flask
app = Flask(__name__)
And I start the server via
$ gunicorn -c config.py app:app
INFO:root:Waiting for messages
[2023-01-31 14:20:46 +0800] [24553] [INFO] Starting gunicorn 20.1.0
[2023-01-31 14:20:46 +0800] [24553] [INFO] Listening at: http://127.0.0.1:8080 (24553)
[2023-01-31 14:20:46 +0800] [24553] [INFO] Using worker: sync
[2023-01-31 14:20:46 +0800] [24580] [INFO] Booting worker with pid: 24580
INFO:root:Sending request
INFO:root:Request sent
INFO:root:Got a message from 24580
INFO:root:Waiting for messages
INFO:root:Got response from 24574
The log shows that the messages were successfully exchanged. Now, we'll stop the worker process and let gunicorn restart it:
$ kill 24580
[2023-01-31 14:22:40 +0800] [24580] [INFO] Worker exiting (pid: 24580)
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/util.py", line 319, in _exit_function
p.join()
File "/usr/lib/python3.6/multiprocessing/process.py", line 122, in join
assert self._parent_pid == os.getpid(), 'can only join a child process'
AssertionError: can only join a child process
[2023-01-31 14:22:40 +0800] [24553] [WARNING] Worker with pid 24574 was terminated due to signal 15
[2023-01-31 14:22:40 +0800] [29497] [INFO] Booting worker with pid: 29497
INFO:root:Sending request
INFO:root:Request sent
Why doesn't s
receive the message from the worker after re-starting?
Besides, why am I getting this 'can only join a child process' error thrown? Does it has something to do with the problem?
In this question, a similar problem is presented, and the solution was to use "multiprocessing.manager.queue". However, this didn't solved the issue in my case.
I already considered the following alternative designs:
threading.Thread
instead of multiprocessing.Process
for the data management process: The data management process initializes an object that will throw an error when it is forked, so I cannot initialize this object within the GUnicorn master process.Gunicorn Issue #1621 somewhat answers my question. As far as I understand this short statement, this is because Gunicorn uses os.fork
and not multiprocessing
, so the utilities in multiprocessing
apparently aren't guaranteed to work with Gunicorn.
So, instead of directly using multiprocessing.Queue
, I replicate the behavior of Queue
with another IPC library. Internally, Queue
is using a ForkingPickler
to serialize the data and this serialized data can also be sent via other IPC libraries, such as ZeroMQ. So, I don't necessarily need the multiprocessing
module for this. Unfortunately, directly replacing the Queue
s with corresponding zeromq
code in the above code exhibits the same behavior than in the question.
This problem can be eliminated by putting the complete multiprocessing
related code into another script, so the service process s
isn't a child process of Gunicorn anymore. This leads to the following code:
config.py:
import logging
import os
import pickle
import zmq
logging.basicConfig(level=logging.INFO)
bind = "localhost:8080"
workers = 1
zmq_url = "tcp://127.0.0.1:5555"
def post_fork(server, worker):
logging.info("Connecting")
context = zmq.Context()
with context.socket(zmq.REQ) as socket:
socket.connect(zmq_url)
logging.info("Sending request")
socket.send(pickle.dumps(os.getpid()))
logging.info("Waiting for a response")
other_pid = pickle.loads(socket.recv())
logging.info("Got response from %d", other_pid)
server.py:
import logging
import os
import pickle
import zmq
def serve(url):
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind(url)
while True:
logging.info("Waiting for requests on %s", url)
message = socket.recv()
logging.info("Got a message from %d", pickle.loads(message))
socket.send(pickle.dumps(os.getpid()))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
serve("tcp://127.0.0.1:5555")
The startup script looks somewhat like this:
#!/usr/bin/env bash
set -euo pipefail
python server.py &
gunicorn -c config.py app:app
This worked reliably during my testing also for killed and restarting workers.