I'm trying to add some functionality to my FastAPI application, which will asynchronously update an ML model (pulled from an S3 bucket.) The idea is to update this model once hourly without blocking the API's ability to respond to CPU-bound tasks, such as model inference requests.
# Global model variable
ml_model = None
# Async function to download model from S3
async def download_model_from_s3(path="integration-tests/artifacts/MULTI.joblib"):
global ml_model
s3_client = boto3.client("s3")
bucket = os.environ.get("BUCKET_BUCKET", "artifacts_bucket")
try:
local_model_path = './model.joblib'
download_coroutine = s3_client.download_file(bucket, path, local_model_path)
await download_coroutine
ml_model = joblib.load(local_model_path)
http://logging.info(f"Model updated.")
except Exception as e:
logging.exception(f"Error downloading or loading model: {e}")
# Asynchronous scheduler function that updates the model every interval
async def scheduler(bucket_name: str, model_key: str, interval=60):
while True:
# Sleep for the specified interval (in minutes)
await asyncio.sleep(interval * 60)
# Call the download function to update the model
await download_model_from_s3(bucket_name, model_key)
app = FastAPI()
# Startup event to start the scheduler
@app.on_event("startup")
async def startup_event():
# BLOCKING: Download the model once at startup to ensure it is available
download_model_from_s3() # Blocking, ensures model is available
# Start the scheduler to update the model every 60 minutes (async, non-blocking)
await scheduler(bucket_name, model_key, interval=60)
I’m familiar with FastAPI but relatively new to async programming. My question: Is this the right way to asynchronously pull data from S3? Is a separate async S3 client required?
I’ve opted to use a while true statement over explicitly scheduling the job once hourly. However, I’m uncertain about the appropriateness of this technique.
Your approach is on the right track, but there are a few adjustments we can make to improve it. Here's a revised version with explanations:
import asyncio
import aioboto3
from fastapi import FastAPI
import joblib
import logging
import os
# Global model variable
ml_model = None
# Async function to download model from S3
async def download_model_from_s3(bucket, path="integration-tests/artifacts/MULTI.joblib"):
global ml_model
session = aioboto3.Session()
async with session.client("s3") as s3_client:
try:
local_model_path = './model.joblib'
await s3_client.download_file(bucket, path, local_model_path)
ml_model = joblib.load(local_model_path)
logging.info("Model updated.")
except Exception as e:
logging.exception(f"Error downloading or loading model: {e}")
# Asynchronous scheduler function
async def scheduler(bucket: str, model_key: str, interval: int = 60):
while True:
await download_model_from_s3(bucket, model_key)
await asyncio.sleep(interval * 60)
app = FastAPI()
@app.on_event("startup")
async def startup_event():
bucket = os.environ.get("BUCKET_BUCKET", "artifacts_bucket")
model_key = "integration-tests/artifacts/MULTI.joblib"
# Initial model download
await download_model_from_s3(bucket, model_key)
# Start the scheduler
asyncio.create_task(scheduler(bucket, model_key, interval=60))
# Your API routes here
Key changes and explanations:
Use aioboto3
: This is an async version of boto3. It's more appropriate for async functions.
Async context manager: Use async with session.client("s3") as s3_client
to properly manage the async S3 client.
Make download_model_from_s3
fully async: Changed to use the async S3 client methods.
Scheduler logic: Moved the download call before the sleep, ensuring the model updates immediately on startup and then every hour after.
Startup event: Made it async and used asyncio.create_task()
to start the scheduler without blocking.
Error handling: Kept the try/except block to handle potential errors during model download.
This approach is more aligned with FastAPI's async nature. The while True
with asyncio.sleep()
is a valid way to schedule recurring tasks in async Python. It's simple and works well for this use case.
Remember that this scheduler runs indefinitely. If you need more complex scheduling or want to be able to stop/start the scheduler, you might want to look into libraries like APScheduler or create a more sophisticated scheduling mechanism.
Also, note that loading the model with joblib.load()
is a CPU-bound operation. If it's a large model, it might still block the event loop momentarily. Consider running it in a separate process if it becomes a bottleneck.