Search code examples
pythonpython-asyncioquart

Running coroutines concurrently to a web server


I want to run a quart server alongside other coroutines. However, it seems that the server is blocking the event loop. The server responds to the requests, but neither the timing routine nor the queue processing routine are running.

import quart
import asyncio
import signal
import random

q = asyncio.Queue()

app = quart.Quart(__name__)

async def run_timer(start=0):
  global q
  i = start
  while True:
    print(f"Current value: {i}")
    await asyncio.sleep(2)
    i += 1

async def process_queue():
  global q
  print("Start queue processing")
  while True:
    val = await q.get()
    print(f"Queue: {val}")
    # Simulate blocking operation
    time.sleep(random.random())

@app.route('/add', methods=['POST'])
async def add():
  global q
  print(await quart.request.get_data())
  values = await quart.request.form 
  print(f"Request: {dict(values)}")
  if 'message' in values:
    q.put_nowait(values['message'])
  return 'Request received'

async def main():
  tasks = [run_timer(0), process_queue(), app.run_task(port=8080,use_reloader=False)]
  await asyncio.gather(*tasks)

asyncio.run(main())

Output:

Current value: 0
Start queue processing
[2021-08-14 12:51:49,541] Running on http://127.0.0.1:8080 (CTRL + C to quit)
Request: {'message': 'test'}
[2021-08-14 12:51:51,837] 127.0.0.1:59990 POST /add 1.1 200 16 1526

The message is sent with curl -d 'message=test' LOCAlhost:8080/add

It would also be nice to stop all coroutines on SIGTERM


Solution

  • After a few failed attempts with Quart, mostly because I couldn't figure out how to pass the queue to the request handler, I arrived at a solution using aiohttp.

    from aiohttp import web
    import asyncio
    import queue
    import functools
    import random
    from signal import SIGINT, SIGTERM
    
    app = web.Application()
    
    async def run_timer(q):
      i = 0
      while True:
        print(f"[Timer] Send message: {i}")
        await q.put(str(i))
        i += 1
        await asyncio.sleep(10)
    
    async def process_queue(q):
      print("[Queue] Start processing")
      id = 0
      while True:
        val = await q.get()
        print(f"[Queue] Process id={id}: {val}")
        await asyncio.sleep(random.randint(3,5))
        print(f"[Queue] Finished id={id}")
        id += 1
    
    async def add(q, request):
      values = await request.post()
      print(f"[Server] Received request: {dict(values)}")
      if 'message' in values:
        msg = values['message']
        await q.put(msg)
        print(f"[Server] Added to queue: {msg}")
        return web.Response(text="Message added to queue\n")
    
    async def start_server():
      runner = web.AppRunner(app)
      await runner.setup()
      site = web.TCPSite(runner, 'localhost', 8080)
      await site.start()
    
      while True:
          await asyncio.sleep(3600) # sleep forever
    
    def handler(sig):
      print(f"Received shutdown signal ({sig!s})")
      loop = asyncio.get_running_loop()
      for task in asyncio.all_tasks(loop=loop):
        task.cancel()
      loop.remove_signal_handler(SIGTERM)
      loop.add_signal_handler(SIGINT, lambda: None)
    
    async def main():
      loop = asyncio.get_running_loop()
      for sig in (SIGINT,SIGTERM):
        loop.add_signal_handler(sig, handler, sig)
    
      try:
        q = asyncio.Queue(maxsize=5)
    
        app.add_routes([web.post('/add', functools.partial(add, q))])
    
        tasks = [run_timer(q), process_queue(q), start_server()]
    
        # return_exceptions=False to catch errors easily
        r = await asyncio.gather(*tasks)
      except asyncio.CancelledError:
        print("Stop")
    
    asyncio.run(main())
    

    I also added some cancellation logic. To test the web server, I used this command: for i in $(seq 1 10); do; curl -d "message=test$i" localhost:8080/add; done

    Maybe this is useful to someone who is also learning how to use asyncio.