Search code examples
pythonmultithreadingwebserverpython-multiprocessing

Multiprocessing with any popular python webserver


I have used a number of python webservers including the standard http.server, flask, tornado, dash, twisted, and cherryPi. I have also read up on django. Afaict none of these have anything remotely resembling true multi-threading. With django for example the recommendation is to use celery which is a completely separate queue based task manager. Yes we can always resort to external queueing: but that then says there is not anything native that is closer to multithreading (in process). I am very aware of the GIL but at least would look for sharing the same code - akin to fork for a c program.

One thought is to try to use the multiprocessing library. And in fact there is a Q&A on that approach with the accepted answer https://stackoverflow.com/a/28149481/1056563 . However that approach seems to be pure socket tcp/ip: it does not include the important Http handling support. That leaves way too much work to be re-implemented (including round objects such as the wheel).

Is there any way to merge the multiprocessing library approach with an available webserver library such as twisted , tornado, dash etc? Otherwise how do we use their useful http handling capabilitiies?

Update We have a mix of workloads

  • small/quick responses (sub millisecond cpu): e.g. a couple of RDBMS calls
  • moderate compute (double digit milliscond cpu) : eg. encryption/decryption of audio files
  • significant compute (hundreds of milliseconds to single digit seconds): e.g. signal processing of audio and image files

We do need to be able to leverage multiple cpu's on a given machine to concurrently handle the mix of tasks/workloads.


Solution

  • If you need to have several http web server to work with just http requests, you can use Gunicorn which create several instances of your app as child processes.

    If you have CPU bound OPs, they will eventually block all http ops, so they should be distributed to other processes. So on start every of your http servers creates several children processes which do heavy tasks.

    So the scheme is Gunicorn->http servers->CPU heavy processes

    Example with aiohttp:

    from aiohttp import web
    import time
    import multiprocessing as mp
    from random import randint
    
    
    def cpu_heavy_operation(num):
        """Just some CPU heavy task"""
        if num not in range(1, 10):
            return 0
        return str(num**1000000)[0:10]
    
    
    def process_worker(q: mp.Queue, name: str):
        """Target function for mp.Process. Better convert it to class"""
        print(f"{name} Started worker process")
        while True:
            i = q.get()
            if i == "STOP":  # poison pill to stop child process gracefully
                break
            else:
                print(f"{name}: {cpu_heavy_operation(i)}")
        print(f"{name} Finished worker process")
    
    
    async def add_another_worker_process(req: web.Request) -> web.Response:
        """Create another one child process"""
        q = req.app["cpu_bound_q"]
        name = randint(100000, 999999)
        pr = mp.Process(
            daemon=False,
            target=process_worker,
            args=(q, f"CPU-Bound_Pr-New-{name}",),
        )
        pr.start()
        req.app["children_pr"] += 1
        return web.json_response({"New": name, "Children": req.app["children_pr"]})
    
    
    async def test_endpoint(req: web.Request) -> web.Response:
        """Just endpoint which feed child processes with tasks"""
        x = req.match_info.get("num")
        req.app["cpu_bound_q"].put(int(x))
        return web.json_response({"num": x})
    
    
    async def stop_ops(app: web.Application) -> None:
        """To do graceful shutdowns"""
        for i in range(app["children_pr"]):
            app["cpu_bound_q"].put("STOP")
    
        time.sleep(30)  # give child processes chance to stop gracefully
    
    
    async def init_func_standalone(args=None) -> web.Application:
        """Application factory for standalone run"""
        app = web.Application()
        app.router.add_get(r"/test/{num:\d+}", test_endpoint)
        app.router.add_get("/add", add_another_worker_process)
    
        # create cpu_bound_ops processes block
        cpu_bound_q = mp.Queue()
        prcs = [
            mp.Process(
                daemon=False,
                target=process_worker,
                args=(cpu_bound_q, f"CPU-Bound_Pr-{i}",),
            ) for i in range(4)
        ]
        [i.start() for i in prcs]
        app["children_pr"] = 4  # you should know how many children processes you need to stop gracefully
        app["cpu_bound_q"] = cpu_bound_q  # Queue for cpu bound ops - multiprocessing module
    
        app.on_cleanup.append(stop_ops)
    
        return app
    
    
    async def init_func_gunicorn() -> web.Application:
        """is used to run aiohttp with Gunicorn"""
        app = await init_func_standalone()
        return app
    
    if __name__ == '__main__':
        _app = init_func_standalone()
        web.run_app(_app, host='0.0.0.0', port=9999)
    

    You see that I multiprocessing, I do it because I like to have more manual control, other option is to go with concurrent.futures. asyncio has run_in_executor method. So just create pool than send CPU heavy tasks to run_in_executor, but before wrap them is create_task asyncio method.