Search code examples
fastapiaiohttp

aiohttp connection closing before FastAPI Streaming Response


I've got a FastAPI app with a route that calls an external API with aiohttp. The external API streams the response, and I'd like to stream my response as well. Here's a simplified version of the router I'm using.

from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import aiohttp

router = APIRouter()

@router.post("/")
async def proxy_stream():
    async with aiohttp.ClientSession() as http_session:
        async with http_session.post(
            "https://streaming-api",
            json={"json": "body"}
        ) as response:
            
            async def process_response():
                async for chunk in response.content.iter_chunked(128):
                    # Work on chunk and then stream it
                    yield process_chunk(chunk)

            return StreamingResponse(
                process_response()
            )

Unfortunately, I keep getting 'aiohttp.client_exceptions.ClientConnectionError: Connection closed' errors. It seems the response is being read outside of its context manager, but not sure how I could solve that.

Any tips?


Solution

  • After the return statement, the context managers are closed and the session is no longer active

    To avoid this you can create session inside your generator function:

    @router.post("/")
    async def proxy_stream():
    
        async def process_response():
            async with aiohttp.ClientSession() as http_session:
                async with http_session.post(
                    "https://streaming-api",
                    json={"json": "body"}
                ) as response:
                    async for chunk in response.content.iter_chunked(128):
                        # Work on chunk and then stream it
                        yield process_chunk(chunk)
    
               
        return StreamingResponse(
            process_response()
        )