Search code examples
pythonpytorchpython-multiprocessinggunicorn

GUnicorn + CUDA: Cannot re-initialize CUDA in forked subprocess


I am creating an inference service with torch, gunicorn and flask that should use CUDA. To reduce resource requirements, I use the preload option of gunicorn, so the model is shared between the worker processes. However, this leads to an issue with CUDA. The following code snipped shows a minimal reproducing example:

from flask import Flask, request
import torch

app = Flask('dummy')

model = torch.rand(500)
model = model.to('cuda:0')


@app.route('/', methods=['POST'])
def f():
    data = request.get_json()
    x = torch.rand((data['number'], 500))
    x = x.to('cuda:0')
    res = x * model
    return {
        "result": res.sum().item()
    }

Starting the server with CUDA_VISIBLE_DEVICES=1 gunicorn -w 3 -b $HOST_IP:8080 --preload run_server:app lets the service start successfully. However, once doing the first request (curl -X POST -d '{"number": 1}'), the worker throws the following error:

[2022-06-28 09:42:00,378] ERROR in app: Exception on / [POST]
Traceback (most recent call last):
  File "/home/user/.local/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/user/.local/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/user/.local/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/user/.local/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/home/user/.local/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/user/.local/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/user/project/run_server.py", line 14, in f
    x = x.to('cuda:0')
  File "/home/user/.local/lib/python3.6/site-packages/torch/cuda/__init__.py", line 195, in _lazy_init
    "Cannot re-initialize CUDA in forked subprocess. " + msg)
RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method

I load the model in the parent process and it's accessible to each forked worker process. The problem occurs when creating a CUDA-backed tensor in the worker process. This re-initializes the CUDA context in the worker process, which fails because it was already initialized in the parent process. If we set x = data['number'] and remove x = x.to('cuda:0'), the inference succeeds.

Adding torch.multiprocessing.set_start_method('spawn') or multiprocessing.set_start_method('spawn') won't change anything, probably because gunicorn will definitely use fork when being started with the --preload option.

A solution could be not using the --preload option, which leads to multiple copies of the model in memory/GPU. But this is what I am trying to avoid.

Is there any possibility to overcome this issue without loading the model separately in each worker process?


Solution

  • Reason for the Error

    As correctly stated in the comments by @Newbie, the issue isn't the model itself, but the CUDA context. When new child processes are forked, the parent's memory is shared read-only with the child, but the CUDA context doesn't support this sharing, it must be copied to the child. Hence, it reports above-mentioned error.

    Spawn instead of Fork

    To resolve this issue, we have to change the start method for the child processes from fork to spawn with multiprocessing.set_start_method. The following simple example works fine:

    import torch
    import torch.multiprocessing as mp
    
    
    def f(y):
        y[0] = 1000
    
    
    if __name__ == '__main__':
        x = torch.zeros(1).cuda()
        x.share_memory_()
    
        mp.set_start_method('spawn')
        p = mp.Process(target=f, args=(x,), daemon=True)
        p.start()
        p.join()
        print("x =", x.item())
    

    When running this code, a second CUDA context is initialized (this can be observed via watch -n 1 nvidia-smi in a second window), and f is executed after the context was initialized completely. After this, x = 1000.0 is printed on the console, thus, we confirmed that the tensor x was successfully shared between the processes.

    However, Gunicorn internally uses os.fork to start the worker processes, so multiprocessing.set_start_method has no influence on Gunicorn's behavior. Consequently, initializing the CUDA context in the root process must be avoided.

    Solution for Gunicorn

    In order to share the model among the worker processes, we thus must load the model in one single process and share it with the workers. Luckily, sending a CUDA tensor via a torch.multiprocessing.Queue to another process doesn't copy the parameters on the GPU, so we can use those queues for this problem.

    import time
    
    import torch
    import torch.multiprocessing as mp
    
    
    def f(q):
        y = q.get()
        y[0] = 1000
    
    
    def g(q):
        x = torch.zeros(1).cuda()
        x.share_memory_()
        q.put(x)
        q.put(x)
        while True:
            time.sleep(1)  # this process must live as long as x is in use
    
    
    if __name__ == '__main__':
        queue = mp.Queue()
        pf = mp.Process(target=f, args=(queue,), daemon=True)
        pf.start()
        pg = mp.Process(target=g, args=(queue,), daemon=True)
        pg.start()
        pf.join()
        x = queue.get()
        print("x =", x.item())  # Prints x = 1000.0
    

    For the Gunicorn server, we can use the same strategy: A model server process loads the model and serves it to each new worker process after its fork. In the post_fork hook the worker requests and receives the model from the model server. A Gunicorn configuration could look like this:

    import logging
    
    from client import request_model
    from app import app
    
    logging.basicConfig(level=logging.INFO)
    
    bind = "localhost:8080"
    workers = 1
    zmq_url = "tcp://127.0.0.1:5555"
    
    
    def post_fork(server, worker):
        app.config['MODEL'], app.config['COUNTER'] = request_model(zmq_url)
    

    In the post_fork hook, we call request_model to get a model from the model server and store the model in the configuration of the Flask application. The method request_model is defined in my example in the file client.py and defined as follows:

    import logging
    import os
    
    from torch.multiprocessing.reductions import ForkingPickler
    import zmq
    
    
    def request_model(zmq_url: str):
        logging.info("Connecting")
        context = zmq.Context()
        with context.socket(zmq.REQ) as socket:
            socket.connect(zmq_url)
            logging.info("Sending request")
            socket.send(ForkingPickler.dumps(os.getpid()))
            logging.info("Waiting for a response")
            model = ForkingPickler.loads(socket.recv())
        logging.info("Got response from object server")
        return model
    

    We make use of ZeroMQ for inter-process communication here because it allows us to reference servers by name/address and to outsource the server code into its own application. multiprocessing.Queue and multiprocessing.Process apparently don't work well with Gunicorn. multiprocessing.Queue uses the ForkingPickler internally to serialize the objects, and the module torch.multiprocessing alters it in a way that Torch data structures can be serialized appropriately and reliably. So, we use this class to serialize our model to send it to the worker processes.

    The model is loaded and served in an application that is completely separate from Gunicorn and defined in server.py:

    from argparse import ArgumentParser
    import logging
    
    import torch
    from torch.multiprocessing.reductions import ForkingPickler
    import zmq
    
    
    def load_model():
        model = torch.nn.Linear(10000, 50000)
        model.cuda()
        model.share_memory()
    
        counter = torch.zeros(1).cuda()
        counter.share_memory_()
        return model, counter
    
    
    def share_object(obj, url):
        context = zmq.Context()
        socket = context.socket(zmq.REP)
        socket.bind(url)
        while True:
            logging.info("Waiting for requests on %s", url)
            message = socket.recv()
            logging.info("Got a message from %d", ForkingPickler.loads(message))
            socket.send(ForkingPickler.dumps(obj))
    
    
    if __name__ == '__main__':
        parser = ArgumentParser(description="Serve model")
        parser.add_argument("--listen-address", default="tcp://127.0.0.1:5555")
        args = parser.parse_args()
    
        logging.basicConfig(level=logging.INFO)
        logging.info("Loading model")
        model = load_model()
        share_object(model, args.listen_address)
    

    For this test, we use a model of about 2GB in size to see an effect on the GPU memory allocation in nvidia-smi and a small tensor to verify that the data is actually shared among the processes.

    Our sample flask application runs the model with a random input, counts the number of requests and returns both results:

    from flask import Flask
    import torch
    
    app = Flask(__name__)
    
    
    @app.route("/", methods=["POST"])
    def infer():
        model: torch.nn.Linear = app.config['MODEL']
        counter: torch.Tensor = app.config['COUNTER']
        counter[0] += 1  # not thread-safe
        input_features = torch.rand(model.in_features).cuda()
        return {
            "result": model(input_features).sum().item(),
            "counter": counter.item()
        }
    

    Test

    The example can be run as follows:

    $ python server.py &
    INFO:root:Waiting for requests on tcp://127.0.0.1:5555 
    $ gunicorn -c config.py app:app
    [2023-02-01 16:45:34 +0800] [24113] [INFO] Starting gunicorn 20.1.0
    [2023-02-01 16:45:34 +0800] [24113] [INFO] Listening at: http://127.0.0.1:8080 (24113)
    [2023-02-01 16:45:34 +0800] [24113] [INFO] Using worker: sync
    [2023-02-01 16:45:34 +0800] [24186] [INFO] Booting worker with pid: 24186
    INFO:root:Connecting
    INFO:root:Sending request
    INFO:root:Waiting for a response
    INFO:root:Got response from object server
    

    Using nvidia-smi, we can observe that now, two processes are using the GPU, and one of them allocates 2GB more VRAM than the other. Querying the flask application also works as expected:

    $ curl -X POST localhost:8080
    {"counter":1.0,"result":-23.956459045410156} 
    $ curl -X POST localhost:8080
    {"counter":2.0,"result":-8.161510467529297}
    $ curl -X POST localhost:8080
    {"counter":3.0,"result":-37.823692321777344}
    

    Let's introduce some chaos and terminate our only Gunicorn worker:

    $ kill 24186
    [2023-02-01 18:02:09 +0800] [24186] [INFO] Worker exiting (pid: 24186)
    [2023-02-01 18:02:09 +0800] [4196] [INFO] Booting worker with pid: 4196
    INFO:root:Connecting
    INFO:root:Sending request
    INFO:root:Waiting for a response
    INFO:root:Got response from object server
    

    It's restarting properly and ready to answer our requests.

    Benefit

    Initially, the amount of required VRAM for our service was (SizeOf(Model) + SizeOf(CUDA context)) * Num(Workers). By sharing the weights of the model, we can reduce this by SizeOf(Model) * (Num(Workers) - 1) to SizeOf(Model) + SizeOf(CUDA context) * Num(Workers).

    Caveats

    The reliability of this approach relies on the single model server process. If that process terminates, not only will newly started workers get stuck, but the models in the existing workers will become unavailable and all workers crash at once. The shared tensors/models are only available as long as the server process is running. Even if the model server and Gunicorn workers are restarted, a short outage is certainly unavoidable. In a production environment, you thus should make sure this server process is kept alive.

    Additionally, sharing data among different processes can have side effects. When sharing changeable data, proper locks must be used to avoid race conditions.