Search code examples
pythonamazon-s3async-awaitfastapi

Asynchronous S3 downloads in FastAPI


I'm trying to add some functionality to my FastAPI application, which will asynchronously update an ML model (pulled from an S3 bucket.) The idea is to update this model once hourly without blocking the API's ability to respond to CPU-bound tasks, such as model inference requests.

# Global model variable
ml_model = None

# Async function to download model from S3
async def download_model_from_s3(path="integration-tests/artifacts/MULTI.joblib"):
    global ml_model
    s3_client = boto3.client("s3")
    bucket = os.environ.get("BUCKET_BUCKET", "artifacts_bucket")
    try:
        local_model_path = './model.joblib'
        download_coroutine = s3_client.download_file(bucket, path, local_model_path)
        await download_coroutine
        ml_model = joblib.load(local_model_path)
        http://logging.info(f"Model updated.")

    except Exception as e:
        logging.exception(f"Error downloading or loading model: {e}")

# Asynchronous scheduler function that updates the model every interval
async def scheduler(bucket_name: str, model_key: str, interval=60):
    while True:
        # Sleep for the specified interval (in minutes)
        await asyncio.sleep(interval * 60)
        # Call the download function to update the model
        await download_model_from_s3(bucket_name, model_key)

app = FastAPI()

# Startup event to start the scheduler
@app.on_event("startup")
async def startup_event():
    # BLOCKING: Download the model once at startup to ensure it is available
    download_model_from_s3()  # Blocking, ensures model is available
    # Start the scheduler to update the model every 60 minutes (async, non-blocking)
    await scheduler(bucket_name, model_key, interval=60)

I’m familiar with FastAPI but relatively new to async programming. My question: Is this the right way to asynchronously pull data from S3? Is a separate async S3 client required?

I’ve opted to use a while true statement over explicitly scheduling the job once hourly. However, I’m uncertain about the appropriateness of this technique.


Solution

  • Your approach is on the right track, but there are a few adjustments we can make to improve it. Here's a revised version with explanations:

    import asyncio
    import aioboto3
    from fastapi import FastAPI
    import joblib
    import logging
    import os
    
    # Global model variable
    ml_model = None
    
    # Async function to download model from S3
    async def download_model_from_s3(bucket, path="integration-tests/artifacts/MULTI.joblib"):
        global ml_model
        session = aioboto3.Session()
        async with session.client("s3") as s3_client:
            try:
                local_model_path = './model.joblib'
                await s3_client.download_file(bucket, path, local_model_path)
                ml_model = joblib.load(local_model_path)
                logging.info("Model updated.")
            except Exception as e:
                logging.exception(f"Error downloading or loading model: {e}")
    
    # Asynchronous scheduler function
    async def scheduler(bucket: str, model_key: str, interval: int = 60):
        while True:
            await download_model_from_s3(bucket, model_key)
            await asyncio.sleep(interval * 60)
    
    app = FastAPI()
    
    @app.on_event("startup")
    async def startup_event():
        bucket = os.environ.get("BUCKET_BUCKET", "artifacts_bucket")
        model_key = "integration-tests/artifacts/MULTI.joblib"
        
        # Initial model download
        await download_model_from_s3(bucket, model_key)
        
        # Start the scheduler
        asyncio.create_task(scheduler(bucket, model_key, interval=60))
    
    # Your API routes here
    

    Key changes and explanations:

    1. Use aioboto3: This is an async version of boto3. It's more appropriate for async functions.

    2. Async context manager: Use async with session.client("s3") as s3_client to properly manage the async S3 client.

    3. Make download_model_from_s3 fully async: Changed to use the async S3 client methods.

    4. Scheduler logic: Moved the download call before the sleep, ensuring the model updates immediately on startup and then every hour after.

    5. Startup event: Made it async and used asyncio.create_task() to start the scheduler without blocking.

    6. Error handling: Kept the try/except block to handle potential errors during model download.

    This approach is more aligned with FastAPI's async nature. The while True with asyncio.sleep() is a valid way to schedule recurring tasks in async Python. It's simple and works well for this use case.

    Remember that this scheduler runs indefinitely. If you need more complex scheduling or want to be able to stop/start the scheduler, you might want to look into libraries like APScheduler or create a more sophisticated scheduling mechanism.

    Also, note that loading the model with joblib.load() is a CPU-bound operation. If it's a large model, it might still block the event loop momentarily. Consider running it in a separate process if it becomes a bottleneck.