I want to run a quart server alongside other coroutines. However, it seems that the server is blocking the event loop. The server responds to the requests, but neither the timing routine nor the queue processing routine are running.
import quart
import asyncio
import signal
import random
q = asyncio.Queue()
app = quart.Quart(__name__)
async def run_timer(start=0):
global q
i = start
while True:
print(f"Current value: {i}")
await asyncio.sleep(2)
i += 1
async def process_queue():
global q
print("Start queue processing")
while True:
val = await q.get()
print(f"Queue: {val}")
# Simulate blocking operation
time.sleep(random.random())
@app.route('/add', methods=['POST'])
async def add():
global q
print(await quart.request.get_data())
values = await quart.request.form
print(f"Request: {dict(values)}")
if 'message' in values:
q.put_nowait(values['message'])
return 'Request received'
async def main():
tasks = [run_timer(0), process_queue(), app.run_task(port=8080,use_reloader=False)]
await asyncio.gather(*tasks)
asyncio.run(main())
Output:
Current value: 0
Start queue processing
[2021-08-14 12:51:49,541] Running on http://127.0.0.1:8080 (CTRL + C to quit)
Request: {'message': 'test'}
[2021-08-14 12:51:51,837] 127.0.0.1:59990 POST /add 1.1 200 16 1526
The message is sent with curl -d 'message=test' LOCAlhost:8080/add
It would also be nice to stop all coroutines on SIGTERM
After a few failed attempts with Quart, mostly because I couldn't figure out how to pass the queue to the request handler, I arrived at a solution using aiohttp.
from aiohttp import web
import asyncio
import queue
import functools
import random
from signal import SIGINT, SIGTERM
app = web.Application()
async def run_timer(q):
i = 0
while True:
print(f"[Timer] Send message: {i}")
await q.put(str(i))
i += 1
await asyncio.sleep(10)
async def process_queue(q):
print("[Queue] Start processing")
id = 0
while True:
val = await q.get()
print(f"[Queue] Process id={id}: {val}")
await asyncio.sleep(random.randint(3,5))
print(f"[Queue] Finished id={id}")
id += 1
async def add(q, request):
values = await request.post()
print(f"[Server] Received request: {dict(values)}")
if 'message' in values:
msg = values['message']
await q.put(msg)
print(f"[Server] Added to queue: {msg}")
return web.Response(text="Message added to queue\n")
async def start_server():
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
while True:
await asyncio.sleep(3600) # sleep forever
def handler(sig):
print(f"Received shutdown signal ({sig!s})")
loop = asyncio.get_running_loop()
for task in asyncio.all_tasks(loop=loop):
task.cancel()
loop.remove_signal_handler(SIGTERM)
loop.add_signal_handler(SIGINT, lambda: None)
async def main():
loop = asyncio.get_running_loop()
for sig in (SIGINT,SIGTERM):
loop.add_signal_handler(sig, handler, sig)
try:
q = asyncio.Queue(maxsize=5)
app.add_routes([web.post('/add', functools.partial(add, q))])
tasks = [run_timer(q), process_queue(q), start_server()]
# return_exceptions=False to catch errors easily
r = await asyncio.gather(*tasks)
except asyncio.CancelledError:
print("Stop")
asyncio.run(main())
I also added some cancellation logic. To test the web server, I used this command: for i in $(seq 1 10); do; curl -d "message=test$i" localhost:8080/add; done
Maybe this is useful to someone who is also learning how to use asyncio.