Search code examples
pythonpython-3.xfastapiconcurrent.futuresasgi

Is having a concurrent.futures.ThreadPoolExecutor call dangerous in a FastAPI endpoint?


I have the following test code:

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

I need to use the concurrent.futures.ThreadPoolExecutor part of the code in a FastAPI endpoint.

My concern is the impact of the number of API calls and the inclusion of threads. Concern about creating too many threads and its related consequences, starving the host, crashing the application and/or the host.

Any thoughts or gotchas on this approach?


Solution

  • You should rather use the HTTPX library, which provides an async API. As described in this answer , you spawn a Client and reuse it every time you need it. To make asynchronous requests with HTTPX, you'll need an AsyncClient.

    You could control the connection pool size as well, using the limits keyword argument on the Client, which takes an instance of httpx.Limits. For example:

    limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
    client = httpx.AsyncClient(limits=limits)
    

    You can adjust the above per your needs. As per the documentation on Pool limit configuration:

    • max_keepalive_connections, number of allowable keep-alive connections, or None to always allow. (Defaults 20)
    • max_connections, maximum number of allowable connections, or None for no limits. (Default 100)
    • keepalive_expiry, time limit on idle keep-alive connections in seconds, or None for no limits. (Default 5)

    If you would like to adjust the timeout as well, you can use the timeout paramter to set timeout on an individual request, or on a Client/AsyncClient instance, which results in the given timeout being used as the default for requests made with this client (see the implementation of Timeout class as well). You can specify the timeout behavior in a fine grained detail; for example, setting the read timeout parameter will specify the maximum duration to wait for a chunk of data to be received (i.e., a chunk of the response body). If HTTPX is unable to receive data within this time frame, a ReadTimeout exception is raised. If set to None instead of some positive numerical value, there will be no timeout on read. The default is 5 seconds timeout on all operations.

    You can use await client.aclose() to explicitly close the AsyncClient when you are done with it (this could be done inside a shutdown event handler, for instance).

    To run multiple asynchronous operations—as you need to request five different URLs, when your API endpoint is called—you can use the awaitable asyncio.gather(). It will execute the async operations and return a list of results in the same order the awaitables (tasks) were passed to that function.

    Working Example

    from fastapi import FastAPI, Request
    from contextlib import asynccontextmanager
    import httpx
    import asyncio
    
    
    URLS = ['https://www.foxnews.com/',
            'https://edition.cnn.com/',
            'https://www.nbcnews.com/',
            'https://www.bbc.co.uk/',
            'https://www.reuters.com/']
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        # customize settings
        limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
        timeout = httpx.Timeout(5.0, read=15.0)  # 15s timeout on read. 5s timeout elsewhere.
    
        # Initialize the Client on startup and add it to the state
        async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
            yield {'client': client}
            # The Client closes on shutdown 
    
    
    app = FastAPI(lifespan=lifespan)
    
    
    async def send(url, client):
        return await client.get(url)
    
    
    @app.get('/')
    async def main(request: Request):
        client = request.state.client
        tasks = [send(url, client) for url in URLS]
        responses = await asyncio.gather(*tasks)
        return [r.text[:50] for r in responses]  # for demo purposes, only return the first 50 chars of each response
    

    If you would like to avoid reading the entire responses' body into RAM, you could use Streaming responses in httpx, as well as utilize FastAPI's StreamingResponse, as described in this answer and demonstrated below:

    from fastapi import FastAPI, Request
    from fastapi.responses import StreamingResponse
    from contextlib import asynccontextmanager
    import httpx
    import asyncio
    
    
    URLS = ['https://www.foxnews.com/',
            'https://edition.cnn.com/',
            'https://www.nbcnews.com/',
            'https://www.bbc.co.uk/',
            'https://www.reuters.com/']
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        # customize settings
        limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
        timeout = httpx.Timeout(5.0, read=15.0)  # 15s timeout on read. 5s timeout elsewhere.
    
        # Initialize the Client on startup and add it to the state
        async with httpx.AsyncClient(limits=limits, timeout=timeout) as client:
            yield {'client': client}
            # The Client closes on shutdown 
    
    
    app = FastAPI(lifespan=lifespan)
    
    
    async def send(url, client):
        req = client.build_request('GET', url)
        return await client.send(req, stream=True)
    
    
    async def iter_content(responses):
         for r in responses:
            async for chunk in r.aiter_text():
                yield chunk[:50]  # for demo purposes, return only the first 50 chars of each response and then break the loop
                yield '\n\n'
                break
            await r.aclose()
    
    
    @app.get('/')
    async def main(request: Request):
        client = request.state.client
        tasks = [send(url, client) for url in URLS]
        responses = await asyncio.gather(*tasks)
        return StreamingResponse(iter_content(responses), media_type='text/event-stream')