Search code examples
pythonfastapi

How to avoid duplicate processing in python API server?


Suppose a function detect_primes is expensive to call, and I would like to avoid repeated calls to it with duplicate parameters. What should I do?

Using caching does not help because the function could be called concurrently in different requests. When both of the requests see the cache as empty of a value, both will proceed to execute the expensive function.

def detect_primes(nums: List[int]) -> Dict[int, bool]:
    """ detect whether a list of numbers are prime """
@app.route('/detect', methods=['GET'])
def search():
    args = request.args
    nums = list(map(int, args.get('nums', '').split(',')))
    return detect_primes(nums)

for example, if a user requests with 13,14,15, another user requests with 15,16. The answers are {"13": true, "14": false, "15": false} and {"15": false, "16": false}

I would like to avoid calling detect_primes with [13, 14, 15] and [15, 16]. Ideally both requests should wait for a call with [13, 14, 15, 16] (or two calls [13, 14, 15] and [16]), and return the respective results.

The choice of web framework is not important to me, you can assume it is flask or fastapi.

EDIT: not sure how the question is a duplicate of or is answered in Are global variables thread-safe in Flask? How do I share data between requests? As explained above, a cache can't be used (be it an in-memory python cache or an external cache or db). I am happy to be proven wrong by an answer.


Solution

  • As per FastAPI's documentation:

    when you declare a path operation function with normal def instead of async def, it is run in an external threadpool that is then awaited, instead of being called directly (as it would block the server).

    In your case—and since you describe it as "Ideally both requests should wait for..."—you could have the /detect endpoint defined with async def and not await for any coroutines (i.e., async def functions) inside—await would pass function control back to the event loop, thus allowing other tasks/requests in the event loop to run. As explained in this answer, which I would highly suggest you take a look at, async def endpoints run directly in the event loop, compared to normal def endpoints that will run in a separate thread from an external threadpool and then be awaited (which is useful when one has blocking operations taking place inside that would block the event loop, if the endpoint was defined with async def instead).

    Thus, using an async def endpoint without any await calls inside would cause the application to process all the requests to the /detect endpoint sequentially. In this way, you could have a dict to cache the previously computed prime numbers, and hence, use it to quickly look up for a specific number in subsequent requests (you should also ensure that /detect is the only endpoint that modifies that dict). You could also limit the size of the dictionary, using a similar approach to this. The example below has the dict just globally declared in the application, but in real world scenarios, it would be preferable to have this dict created at application startup, and pass it to request.state, which would also allow one to access it from endpoints that are defined outside the main application (usually defined in APIRouters)—have a look at this answer on how to do that.

    Example

    You could test the below through Swagger UI autodocs at http://127.0.0.1:8000/docs, or by typing the following URL directly in the address bar of your web browser, e.g., http://127.0.0.1:8000/?nums=13&nums=14&nums=15.

    from fastapi import FastAPI, Query
    from typing import List, Dict
    
    app = FastAPI()
    d = {}
    
    
    def is_prime(n) -> bool:
        # check whether 'n' is prime or not
    
    
    def detect_primes(nums: List[int]) -> Dict[int, bool]:
        res = {}
        for n in nums:
            if n in d:
                res[n] = d.get(n)
                print(f'{n} found in dict')
            else:
                is_n_Prime = is_prime(n)
                res[n] = is_n_Prime
                d[n] = is_n_Prime
        return res
    
    
    @app.get("/detect")
    async def search(nums: List[int] = Query(...)):
        return detect_primes(nums)
    

    Solutions when having await calls inside the endpoint

    If, however, you are required to make await calls inside your async def endpoint—which would cause requests to be processed concurrently, and hence, the dict of prime numbers could be accessed by multiple requests concurrently—you could use, a Semaphore object, for instance, in order to control the access to the dictionary, as described here. However, if you plan on having multiple workers active at the same time, with each worker having its own things and variables—workers don't share the same memory—you should rather use a database storage, such as PostgreSQL and SQLite, or Key-Value stores (Caches), such as Redis (have a look at the answers here and here). Also, you may want to try using aioredlock, which allows "creating distributed locks between workers (processes)", as described here.