I have used a number of python webservers including the standard http.server, flask, tornado, dash, twisted, and cherryPi. I have also read up on django. Afaict none of these have anything remotely resembling true multi-threading. With django for example the recommendation is to use celery which is a completely separate queue based task manager. Yes we can always resort to external queueing: but that then says there is not anything native that is closer to multithreading (in process). I am very aware of the GIL but at least would look for sharing the same code - akin to fork for a c program.
One thought is to try to use the multiprocessing library. And in fact there is a Q&A on that approach with the accepted answer https://stackoverflow.com/a/28149481/1056563 . However that approach seems to be pure socket tcp/ip: it does not include the important Http handling support. That leaves way too much work to be re-implemented (including round objects such as the wheel).
Is there any way to merge the multiprocessing library approach with an available webserver library such as twisted , tornado, dash etc? Otherwise how do we use their useful http
handling capabilitiies?
Update We have a mix of workloads
We do need to be able to leverage multiple cpu's on a given machine to concurrently handle the mix of tasks/workloads.
If you need to have several http web server to work with just http requests, you can use Gunicorn which create several instances of your app as child processes.
If you have CPU bound OPs, they will eventually block all http ops, so they should be distributed to other processes. So on start every of your http servers creates several children processes which do heavy tasks.
So the scheme is Gunicorn->http servers->CPU heavy processes
Example with aiohttp:
from aiohttp import web
import time
import multiprocessing as mp
from random import randint
def cpu_heavy_operation(num):
"""Just some CPU heavy task"""
if num not in range(1, 10):
return 0
return str(num**1000000)[0:10]
def process_worker(q: mp.Queue, name: str):
"""Target function for mp.Process. Better convert it to class"""
print(f"{name} Started worker process")
while True:
i = q.get()
if i == "STOP": # poison pill to stop child process gracefully
break
else:
print(f"{name}: {cpu_heavy_operation(i)}")
print(f"{name} Finished worker process")
async def add_another_worker_process(req: web.Request) -> web.Response:
"""Create another one child process"""
q = req.app["cpu_bound_q"]
name = randint(100000, 999999)
pr = mp.Process(
daemon=False,
target=process_worker,
args=(q, f"CPU-Bound_Pr-New-{name}",),
)
pr.start()
req.app["children_pr"] += 1
return web.json_response({"New": name, "Children": req.app["children_pr"]})
async def test_endpoint(req: web.Request) -> web.Response:
"""Just endpoint which feed child processes with tasks"""
x = req.match_info.get("num")
req.app["cpu_bound_q"].put(int(x))
return web.json_response({"num": x})
async def stop_ops(app: web.Application) -> None:
"""To do graceful shutdowns"""
for i in range(app["children_pr"]):
app["cpu_bound_q"].put("STOP")
time.sleep(30) # give child processes chance to stop gracefully
async def init_func_standalone(args=None) -> web.Application:
"""Application factory for standalone run"""
app = web.Application()
app.router.add_get(r"/test/{num:\d+}", test_endpoint)
app.router.add_get("/add", add_another_worker_process)
# create cpu_bound_ops processes block
cpu_bound_q = mp.Queue()
prcs = [
mp.Process(
daemon=False,
target=process_worker,
args=(cpu_bound_q, f"CPU-Bound_Pr-{i}",),
) for i in range(4)
]
[i.start() for i in prcs]
app["children_pr"] = 4 # you should know how many children processes you need to stop gracefully
app["cpu_bound_q"] = cpu_bound_q # Queue for cpu bound ops - multiprocessing module
app.on_cleanup.append(stop_ops)
return app
async def init_func_gunicorn() -> web.Application:
"""is used to run aiohttp with Gunicorn"""
app = await init_func_standalone()
return app
if __name__ == '__main__':
_app = init_func_standalone()
web.run_app(_app, host='0.0.0.0', port=9999)
You see that I multiprocessing
, I do it because I like to have more manual control, other option is to go with concurrent.futures
. asyncio
has run_in_executor
method. So just create pool than send CPU heavy tasks to run_in_executor
, but before wrap them is create_task
asyncio
method.