I would like to write tests for my FastApi WebSocket application, but a single test runs forever and doesn't stop, which prevents the next test to start.
I invoke a thread during the FastApi app "startup" which itself spawns processes. I feel like the combination of using threads and processes does create some issues within the TestClient and by that does the test never stop.
import uvicorn as uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import threading
import asyncio
from fastapi.testclient import TestClient
from multiprocessing import Process
app = FastAPI()
class ProcessHeavyCPU:
def __init__(self): pass
def run(self):
while True:
print(f"Thread ID: [{str(threading.get_ident())}] | endless loop within process")
class ThreadHandler(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
processes = []
for _ in range(2):
p = Process(target=ProcessHeavyCPU().run)
processes.append(p)
[x.start() for x in processes]
while True: # <------ the test "test_example" runs forever
print(f"Thread ID: [{str(threading.get_ident())}] | endless loop within thread")
@app.on_event("startup")
async def startup_event():
thread1 = ThreadHandler()
thread1.daemon = True
thread1.start()
async def handle_msg(websocket):
await websocket.send_json({"msg": "New msg received"})
background_tasks = set()
@app.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
msg = await websocket.receive_text()
task = asyncio.create_task(handle_msg(websocket))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
except WebSocketDisconnect:
await websocket.close()
def test_example(): # <-- the test which does run forever
with TestClient(app) as client:
with client.websocket_connect("/") as websocket:
websocket.send_json({
"new_msg": "xyz"
})
resp = websocket.receive_json()
assert resp["msg"] == "New msg received"
print("finished test")
if __name__ == '__main__':
uvicorn.run("main:app", host="0.0.0.0", port=8081, reload=True, access_log=False)
Any ideas how this can be fixed, so I can use threads & processes within my app while using the comfortable TestClient for FastApi (Starlette) WebSocket testing?
python -m pytest .\main.py
Python 3.10.5
Documentation:
I am not sure this one is you are looking or not. But the reason that your test doesn't stop is because your thread is not stopped. So instead only start the thread on startup you probably also to stop it on shutdown. So it should be like this:
import uvicorn as uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import threading
import asyncio
from fastapi.testclient import TestClient
from multiprocessing import Process
app = FastAPI()
class ProcessHeavyCPU:
def __init__(self): pass
def run(self):
while True:
print(f"Thread ID: [{str(threading.get_ident())}] | endless loop within process")
class ThreadHandler(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.running = True
def run(self):
processes = []
for _ in range(2):
p = Process(target=ProcessHeavyCPU().run)
processes.append(p)
[x.start() for x in processes]
while self.running: # <------ the test "test_example" runs forever
print(f"Thread ID: [{str(threading.get_ident())}] | endless loop within thread")
def stop(self):
self.running = False
thread1 = ThreadHandler()
@app.on_event("startup")
async def startup_event():
thread1.daemon = True
thread1.start()
@app.on_event("shutdown")
async def startup_event():
thread1.stop()
async def handle_msg(websocket):
await websocket.send_json({"msg": "New msg received"})
background_tasks = set()
@app.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
msg = await websocket.receive_text()
task = asyncio.create_task(handle_msg(websocket))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
except WebSocketDisconnect:
await websocket.close()
def test_example(): # <-- the test which does run forever
with TestClient(app) as client:
with client.websocket_connect("/") as websocket:
websocket.send_json({
"new_msg": "xyz"
})
resp = websocket.receive_json()
assert resp["msg"] == "New msg received"
print("finished test")
if __name__ == '__main__':
uvicorn.run("main:app", host="0.0.0.0", port=8081, reload=True, access_log=False)