Search code examples
pythonwebsocketfastapibackground-taskstarlette

How to run a Background Task when using websockets in FastAPI/Starlette?


My ultimate goal is to write code that only triggers other servers when calling an endpoint and waits for data from a specific channel in Redis to come in. I don't want to know external server's business logic is done due to latency.

The call_external_server background task function below will notify the external server to send data to Redis (pub/sub). However, it doesn't execute.

Here is my code:

async def call_external_server(channel, text):
    print("call_external_server start")
    async with aiohttp.ClientSession() as session:
        async with session.get(f"http://localhost:9000/pub?channel={channel}&text={text}") as resp:
            print(resp)
    print("call_external_server finished")
    return {"response": "external_server is done"}


@app.websocket("/ws")
async def websocket_endpoint(channel: str, websocket: WebSocket, background_task: BackgroundTasks):
    await websocket.accept()
    client_info = dict(websocket.headers)
    text = client_info.get("text")
    redis_reader: redis.client.PubSub = await get_redis_pubsub()
    await redis_reader.subscribe(channel)
    

    # Problem is Here 
    background_task.add_task(call_external_server, channel, text)
    # Background task Doesn't work properly 

    try:
        while True:
            message = await redis_reader.get_message(ignore_subscribe_messages=True)
            if message is not None:
                decoded_msg = message["data"].decode()
                if decoded_msg == STOPWORD:
                    print("(Reader) STOP")
                    break
                await websocket.send_text(decoded_msg)
    except Exception as e:
        print(e)
        await websocket.close()
        return
    await websocket.close()
    return

Solution

  • A BackgroundTask is executed after returning a response to a client's request (not on ongoing websocket connections)—see this answer, as well as this answer and this related comment for more details on Background Tasks. As explained by @tiangolo (the creator of FastAPI):

    Background Tasks internally depend on a Request, and they are executed after returning the request to the client. In the case of a WebSocket, as the WebSocket is not really "finished" but is an ongoing connection, it wouldn't be a background task.


    You could, however, have functions executed in the background, using one of the options described in this answer. If your background task is an async def function, you should rather use Option 3 (see the linked answer above for more details), i.e., using asyncio.create_task(). Have a look at this related answer as well.

    Example

    async def call_external_server(channel, text):
        pass
    
    
    @app.websocket("/ws")
    async def websocket_endpoint(websocket: WebSocket):
       await websocket.accept()
       # ...
       asyncio.create_task(call_external_server(channel, text))
    

    I would also suggest having a look at this answer, as well as this answer and this answer on how to spawn an HTTP Client once at application startup and reuse it every time is needed, instead of creating a new connection every time the endpoint is called (as shown in the example provided in your question).