I've got a FastAPI app with a route that calls an external API with aiohttp. The external API streams the response, and I'd like to stream my response as well. Here's a simplified version of the router I'm using.
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import aiohttp
router = APIRouter()
@router.post("/")
async def proxy_stream():
async with aiohttp.ClientSession() as http_session:
async with http_session.post(
"https://streaming-api",
json={"json": "body"}
) as response:
async def process_response():
async for chunk in response.content.iter_chunked(128):
# Work on chunk and then stream it
yield process_chunk(chunk)
return StreamingResponse(
process_response()
)
Unfortunately, I keep getting 'aiohttp.client_exceptions.ClientConnectionError: Connection closed' errors. It seems the response is being read outside of its context manager, but not sure how I could solve that.
Any tips?
After the return statement, the context managers are closed and the session is no longer active
To avoid this you can create session inside your generator function:
@router.post("/")
async def proxy_stream():
async def process_response():
async with aiohttp.ClientSession() as http_session:
async with http_session.post(
"https://streaming-api",
json={"json": "body"}
) as response:
async for chunk in response.content.iter_chunked(128):
# Work on chunk and then stream it
yield process_chunk(chunk)
return StreamingResponse(
process_response()
)