I'm studying the process of distributing artificial intelligence modules through FastAPI.
I created a FastAPI app that answers questions using a pre-learned Machine Learning model.
In this case, it is not a problem for one user to use it, but when multiple users use it at the same time, the response may be too slow.
Hence, when multiple users enter a question, is there any way to copy the model and load it in at once?
class sentencebert_ai():
def __init__(self) -> None:
super().__init__()
def ask_query(self,query, topN):
startt = time.time()
ask_result = []
score = []
result_value = []
embedder = torch.load(model_path)
corpus_embeddings = embedder.encode(corpus, convert_to_tensor=True)
query_embedding = embedder.encode(query, convert_to_tensor=True)
cos_scores = util.pytorch_cos_sim(query_embedding, corpus_embeddings)[0] #torch.Size([121])121개의 말뭉치에 대한 코사인 유사도 값이다.
cos_scores = cos_scores.cpu()
top_results = np.argpartition(-cos_scores, range(topN))[0:topN]
for idx in top_results[0:topN]:
ask_result.append(corpusid[idx].item())
#.item()으로 접근하는 이유는 tensor(5)에서 해당 숫자에 접근하기 위한 방식이다.
score.append(round(cos_scores[idx].item(),3))
#서버에 json array 형태로 내보내기 위한 작업
for i,e in zip(ask_result,score):
result_value.append({"pred_id":i,"pred_weight":e})
endd = time.time()
print('시간체크',endd-startt)
return result_value
# return ','.join(str(e) for e in ask_result),','.join(str(e) for e in score)
class Item_inference(BaseModel):
text : str
topN : Optional[int] = 1
@app.post("/retrieval", tags=["knowledge recommendation"])
async def Knowledge_recommendation(item: Item_inference):
# db.append(item.dict())
item.dict()
results = _ai.ask_query(item.text, item.topN)
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--port", default='9003', type=int)
# parser.add_argument("--mode", default='cpu', type=str, help='cpu for CPU mode, gpu for GPU mode')
args = parser.parse_args()
_ai = sentencebert_ai()
uvicorn.run(app, host="0.0.0.0", port=args.port,workers=4)
corrected version
@app.post("/aaa") def your_endpoint(request: Request, item:Item_inference): start = time.time() model = request.app.state.model item.dict() #커널 실행시 필요 _ai = sentencebert_ai() results = _ai.ask_query(item.text, item.topN,model) end = time.time() print(end-start) return results ```
First, you should rather not load your model every time a request arrives, but rahter have it loaded once at startup
(you could use the startup
event for this) and store it on the app instance—using the generic app.state
attribute (see implementation of State too)—which you can later retrieve, as described here and here
Update: startup
event has recently been deprecated, and since then, the recommended way to handle startup
and shutdown
events is using the lifespan
handler, as demonstrated in this answer. You might still find the references provided earlier useful, as they provide information on additional concepts in FastAPI. For now, you could keep using the startup
event, but it is recommended not to, as it might be completely removed from future FastAPI/Starlette versions.
For instance:
from fastapi import Request
@app.on_event("startup")
async def startup_event():
app.state.model = torch.load('<model_path>')
Second, if you do not have any async def
functions inside your endpoint that you have to await
, you could define your endpoint with normal def
instead. In this way, FastAPI will run requests to that def
endpoint in a separate thread from an external threadpool, which will then be await
ed (so that the blocking operations inside won't block the event loop); whereas, async def
endpoints run directly in the event loop, and thus any synchronous blocking operations inside would block the event loop. Please have a look at the answers here and here, as well as all the references included in them, in order to understand the concept of async
/await
, as well as the difference between using def
and async def
in FastAPI. Example with normal def
endpoint:
@app.post('/')
def your_endpoint(request: Request):
model = request.app.state.model
# run your synchronous ask_query() function here
Alternatively, as described here, you could use an async def
endpoint and have your CPU-bound task run in a separate process (which is more suited than using a thread), using ProcessPoolExecutor
, and integrate it with asyncio
, in order to await
for it to complete and return the result(s). Beware that it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc.; that is, your code must be under if __name__ == '__main__'
.
Note that in the example below a new ProcessPool
is created every time a request arrives at /
endpoint, but a more suitable approach would be to have a reusable ProcessPoolExecutor
created at application startup instead, which you could add to request.state
, as demonstrated in this answer. Also, as explained earlier, startup
event is now deprecated, and you should rather use a lifepsan
event, as demonstrated in the linked answer provided earlier at the beginning of this answer, as well as the one provided just above.
from fastapi import FastAPI, Request
import concurrent.futures
import asyncio
import uvicorn
class MyAIClass():
def __init__(self) -> None:
super().__init__()
def ask_query(self, model, query, topN):
# ...
ai = MyAIClass()
app = FastAPI()
@app.on_event("startup")
async def startup_event():
app.state.model = torch.load('<model_path>')
@app.post('/')
async def your_endpoint(request: Request):
model = request.app.state.model
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, ai.ask_query, model, item.text, item.topN)
if __name__ == '__main__':
uvicorn.run(app)
Note that if you plan on having several workers active at the same time, each worker has its own memory—in other words, workers do not share the same memory—and hence, each worker will load their own instance of the ML model into memory (RAM). If, for instance, you are using four workers for your app, the model will result in being loaded four times into RAM. Thus, if the model, as well as other variables in your code, are consuming a large amount of memory, each process/worker will consume an equivalent amount of memory. If you would like to avoid that, you may have a look at how to share objects across multiple workers, as well as—if you are using Gunicorn as a process manager with Uvicorn workers—you can use Gunicorn's --preload
flag. As per the documentation:
Command line:
--preload
Default:
False
Load application code before the worker processes are forked.
By preloading an application you can save some RAM resources as well as speed up server boot times. Although, if you defer application loading to each worker process, you can reload your application code easily by restarting workers.
Example:
gunicorn --workers 4 --preload --worker-class=uvicorn.workers.UvicornWorker app:app
Note that you cannot combine Gunicorn's --preload
with --reload
flag, as when the code is preloaded into the master process, the new worker processes—which will automatically be created, if your application code has changed—will still have the old code in memory, due to how fork()
works.