Search code examples
pythonpython-3.xpython-asynciostarlette

Python and Starlette: running a long async task


I have a simple experiment in the code snippet shown below. My goal is to have the browser client (via a WebSocket) kick off a long-running task on the server, but the server should service WebSocket messages from the client while the long-running task is running. Here's the workflow ("OK" means this step is working as-is in the snippet, while "?" means this is what I'm trying to figure out)...

  • OK - Run the code
  • OK - Launch a browser at 127.0.0.1
  • OK - WebSocket connects
  • OK - Click "Send" and the browser client generates a random number, sends it to the server, and the server echoes back the number
  • OK - Click "Begin" and this invokes a long-running task on the server (5.0 seconds)
  • ? - During this 5sec (while the long-running task is running), I'd like to click "Send" and have the server immediately echo back the random number that was sent from the client while the long-running task continues to be concurrently executed in the event loop

For that last bullet point, it is not working that way: rather, if you click "Send" while the long process is running, the long process finishes and then the numbers are echoed back. To me, this demonstrates that await simulate_long_process(websocket) is truly waiting for simulate_long_process() to complete -- makes sense. However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks and therefore go back to the while True loop to service the next incoming messages. I was expecting this because simulate_long_process() is fully async (async def, await websocket.send_text(), and await asyncio.sleep()). The current behavior kinda makes sense but not what I want. So my question is, how can I achieve my goal of responding to incoming messages on the WebSocket while the long-running task is running? I am interested in two (or more) approaches:

  1. Spawning the long-running task in a different thread. For example, with asyncio.to_thread() or by stuffing a message into a separate queue that another thread is reading, which then executes the long-running task (e.g. like a producer/consumer queue). Furthermore, I can see how using those same queues, at the end of the long-running tasks, I could then send acknowledgment messages back to the Starlette/async thread and then back to the client over the WebSocket to tell them a task has completed.
  2. Somehow achieving this "purely async"? "Purely async" means mostly or entirely using features/methods from the asyncio package. This might delve into synchronous or blocking code, but here I'm thinking about things like: organizing my coroutines into a TaskGroup() object to get concurrent execution, using call_soon(), using run_in_executor(), etc. I'm really interested in hearing about this approach! But I'm skeptical since it may be convoluted. The spirit of this is mentioned here: Long-running tasks with async server

I can certainly see the path to completion on approach (1). So I'm debating how "pure async" I try to go -- maybe Starlette (running in its own thread) is the only async portion of my entire app, and the rest of my (CPU-bound, blocking) app is on a different (synchronous) thread. Then, the Starlette async thread and the CPU-bound sync thread simply coordinate via a queue. This is where I'm headed but I'd like to hear some thoughts to see if a "pure async" approach could be reasonably implemented. Stated differently, if someone could refactor the code snippet below to work as intended (responding immediately to "Send" while the long-running task is running), using only or mostly methods from asyncio then that would be a good demonstration.

from starlette.applications import Starlette
from starlette.responses import HTMLResponse
from starlette.routing import Route, WebSocketRoute
import uvicorn
import asyncio

index_str = """<!DOCTYPE HTML>
<html>
<head>
    <script type = "text/javascript">
    const websocket = new WebSocket("ws://127.0.0.1:80");
    window.addEventListener("DOMContentLoaded", () => {
        websocket.onmessage = ({ data }) => {
            console.log('Received: ' + data)
            document.body.innerHTML += data + "<br>";
        };
    });
    </script>
</head>
<body>
    WebSocket Async Experiment<br>
    <button onclick="websocket.send(Math.floor(Math.random()*10))">Send</button><br>
    <button onclick="websocket.send('begin')">Begin</button><br>
    <button onclick="websocket.send('close')">Close</button><br>
</body>
</html>
"""

def homepage(request):
    return HTMLResponse(index_str)


async def simulate_long_process(websocket):
    await websocket.send_text(f'Running long process...')
    await asyncio.sleep(5.0)


async def websocket_endpoint(websocket):
    await websocket.accept()
    await websocket.send_text(f'Server connected')
    while True:
        msg = await websocket.receive_text()
        print(f'server received: {msg}')
        if msg == 'begin':
            await simulate_long_process(websocket)
        elif msg == 'close':
            await websocket.send_text('Server closed')
            break
        else:
            await websocket.send_text(f'Server received {msg} from client')
    await websocket.close()
    print('Server closed')


if __name__ == '__main__':

    routes = [
        Route('/', homepage),
        WebSocketRoute('/', websocket_endpoint) ]

    app = Starlette(debug=True, routes=routes)
    uvicorn.run(app, host='0.0.0.0', port=80)

Solution

  • First:

    However, part of me was expecting that await simulate_long_process(websocket) would signal the event loop that it could go work on other tasks

    That is exactly what await means: it means, "stop executing this coroutine (websocket_endpoint) while we wait for a result from simulate_long_process, and go service other coroutines".

    As it happens, you don't have any concurrent coroutines running, so this just pauses things until simulate_long_process returns.

    Second:

    Even if you were to run simulate_long_process concurrently (e.g., by creating a task using asyncio.create_task and then checking if its complete), your while loop blocks waiting for text from the client. This means that you can't, for instance, send the client a message when simulate_long_process completes, because the client needs to send you something before the body of the while loop can execute.


    I haven't worked with Starlette before, so this may not be the most canonical solution, but here's an implementation that uses a WebSocketEndpoint to implement the desired behavior:

    from starlette.applications import Starlette
    from starlette.responses import HTMLResponse
    from starlette.routing import Route, WebSocketRoute
    from starlette.endpoints import WebSocketEndpoint
    import uvicorn
    import asyncio
    
    SERVER_PORT=8000
    
    index_str = """<!DOCTYPE HTML>
    <html>
    <head>
        <script type = "text/javascript">
        const websocket = new WebSocket("ws://127.0.0.1:%s");
        window.addEventListener("DOMContentLoaded", () => {
            websocket.onmessage = ({ data }) => {
                console.log('Received: ' + data)
                document.body.innerHTML += data + "<br>";
            };
        });
        </script>
    </head>
    <body>
        WebSocket Async Experiment<br>
        <button onclick="websocket.send(Math.floor(Math.random()*10))">Send</button><br>
        <button onclick="websocket.send('begin')">Begin</button><br>
        <button onclick="websocket.send('close')">Close</button><br>
    </body>
    </html>
    """ % (SERVER_PORT)
    
    def homepage(request):
        return HTMLResponse(index_str)
    
    class Consumer(WebSocketEndpoint):
        encoding = 'text'
        task = None
    
        async def  on_connect(self, ws):
            await ws.accept()
    
        async def on_receive(self, ws, data):
            match data:
                case 'begin':
                    if self.task is not None:
                        await ws.send_text('background task is already running')
                        return
    
                    await ws.send_text('start background task')
                    self.task = asyncio.create_task(self.simulate_long_task(ws))
                case 'close':
                    await ws.send_text('closing connection')
                    await ws.close()
                case _:
                    await ws.send_text(f'Server received {data} from client')
    
        async def simulate_long_task(self, ws):
            await ws.send_text('start long process')
            await asyncio.sleep(5)
            await ws.send_text('finish long process')
            self.task = None
    
        async def on_disconnect(self, ws, close_code):
            pass
    
    if __name__ == '__main__':
    
        routes = [
            Route('/', homepage),
            WebSocketRoute('/', Consumer) ]
    
        app = Starlette(debug=True, routes=routes)
        uvicorn.run(app, host='0.0.0.0', port=SERVER_PORT)
    

    (Note that this by default uses port 8000 instead of port 80 because I already have something running on port 80 locally.)