Search code examples
pythonmachine-learningmultiprocessingfastapi

How to process requests from multiiple users using ML model and FastAPI?


I'm studying the process of distributing artificial intelligence modules through FastAPI.

I created a FastAPI app that answers questions using a pre-learned Machine Learning model.

In this case, it is not a problem for one user to use it, but when multiple users use it at the same time, the response may be too slow.

Hence, when multiple users enter a question, is there any way to copy the model and load it in at once?

class sentencebert_ai():
    def __init__(self) -> None:
        super().__init__()

 def ask_query(self,query, topN):
        startt = time.time()

        ask_result = []
        score = []
        result_value = []  
        embedder = torch.load(model_path)
        corpus_embeddings = embedder.encode(corpus, convert_to_tensor=True)
        query_embedding = embedder.encode(query, convert_to_tensor=True)
        cos_scores = util.pytorch_cos_sim(query_embedding, corpus_embeddings)[0] #torch.Size([121])121개의 말뭉치에 대한 코사인 유사도 값이다.
        cos_scores = cos_scores.cpu()

        top_results = np.argpartition(-cos_scores, range(topN))[0:topN]

        for idx in top_results[0:topN]:        
            ask_result.append(corpusid[idx].item())
            #.item()으로 접근하는 이유는 tensor(5)에서 해당 숫자에 접근하기 위한 방식이다.
            score.append(round(cos_scores[idx].item(),3))

        #서버에 json array 형태로 내보내기 위한 작업
        for i,e in zip(ask_result,score):
            result_value.append({"pred_id":i,"pred_weight":e})
        endd = time.time()
        print('시간체크',endd-startt)
        return result_value
        # return ','.join(str(e) for e in ask_result),','.join(str(e) for e in score)



class Item_inference(BaseModel):
    text : str
    topN : Optional[int] = 1

@app.post("/retrieval", tags=["knowledge recommendation"])
async def Knowledge_recommendation(item: Item_inference):
  
    # db.append(item.dict())
    item.dict()
    results = _ai.ask_query(item.text, item.topN)

    return results


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--port", default='9003', type=int)
    # parser.add_argument("--mode", default='cpu', type=str, help='cpu for CPU mode, gpu for GPU mode')
    args = parser.parse_args()

    _ai = sentencebert_ai()
    uvicorn.run(app, host="0.0.0.0", port=args.port,workers=4)

corrected version

@app.post("/aaa") def your_endpoint(request: Request, item:Item_inference): start = time.time() model = request.app.state.model item.dict() #커널 실행시 필요 _ai = sentencebert_ai() results = _ai.ask_query(item.text, item.topN,model) end = time.time() print(end-start) return results ``` 

Solution

  • First, you should rather not load your model every time a request arrives, but rahter have it loaded once at startup (you could use the startup event for this) and store it on the app instance—using the generic app.state attribute (see implementation of State too)—which you can later retrieve, as described here and here

    Update: startup event has recently been deprecated, and since then, the recommended way to handle startup and shutdown events is using the lifespan handler, as demonstrated in this answer. You might still find the references provided earlier useful, as they provide information on additional concepts in FastAPI. For now, you could keep using the startup event, but it is recommended not to, as it might be completely removed from future FastAPI/Starlette versions.

    For instance:

    from fastapi import Request
    
    @app.on_event("startup")
    async def startup_event():
        app.state.model = torch.load('<model_path>')
    

    Second, if you do not have any async def functions inside your endpoint that you have to await, you could define your endpoint with normal def instead. In this way, FastAPI will run requests to that def endpoint in a separate thread from an external threadpool, which will then be awaited (so that the blocking operations inside won't block the event loop); whereas, async def endpoints run directly in the event loop, and thus any synchronous blocking operations inside would block the event loop. Please have a look at the answers here and here, as well as all the references included in them, in order to understand the concept of async/await, as well as the difference between using def and async def in FastAPI. Example with normal def endpoint:

    @app.post('/')
    def your_endpoint(request: Request):
        model = request.app.state.model
        # run your synchronous ask_query() function here
    

    Alternatively, as described here, you could use an async def endpoint and have your CPU-bound task run in a separate process (which is more suited than using a thread), using ProcessPoolExecutor, and integrate it with asyncio, in order to await for it to complete and return the result(s). Beware that it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc.; that is, your code must be under if __name__ == '__main__'.

    Note that in the example below a new ProcessPool is created every time a request arrives at / endpoint, but a more suitable approach would be to have a reusable ProcessPoolExecutor created at application startup instead, which you could add to request.state, as demonstrated in this answer. Also, as explained earlier, startup event is now deprecated, and you should rather use a lifepsan event, as demonstrated in the linked answer provided earlier at the beginning of this answer, as well as the one provided just above.

    Example

    from fastapi import FastAPI, Request
    import concurrent.futures
    import asyncio
    import uvicorn
    
    
    class MyAIClass():
        def __init__(self) -> None:
            super().__init__()
    
        def ask_query(self, model, query, topN):
            # ...
     
    
    ai = MyAIClass()
    app = FastAPI()
    
    
    @app.on_event("startup")
    async def startup_event():
        app.state.model = torch.load('<model_path>')
    
    
    @app.post('/')
    async def your_endpoint(request: Request):
        model = request.app.state.model
    
        loop = asyncio.get_running_loop()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            res = await loop.run_in_executor(pool, ai.ask_query, model, item.text, item.topN)
    
    
    if __name__ == '__main__':
        uvicorn.run(app)
    

    Using multiple workers

    Note that if you plan on having several workers active at the same time, each worker has its own memory—in other words, workers do not share the same memory—and hence, each worker will load their own instance of the ML model into memory (RAM). If, for instance, you are using four workers for your app, the model will result in being loaded four times into RAM. Thus, if the model, as well as other variables in your code, are consuming a large amount of memory, each process/worker will consume an equivalent amount of memory. If you would like to avoid that, you may have a look at how to share objects across multiple workers, as well as—if you are using Gunicorn as a process manager with Uvicorn workers—you can use Gunicorn's --preload flag. As per the documentation:

    Command line: --preload

    Default: False

    Load application code before the worker processes are forked.

    By preloading an application you can save some RAM resources as well as speed up server boot times. Although, if you defer application loading to each worker process, you can reload your application code easily by restarting workers.

    Example:

    gunicorn --workers 4 --preload --worker-class=uvicorn.workers.UvicornWorker app:app
    

    Note that you cannot combine Gunicorn's --preload with --reload flag, as when the code is preloaded into the master process, the new worker processes—which will automatically be created, if your application code has changed—will still have the old code in memory, due to how fork() works.