Search code examples
azure-functionspython-asyncioaiohttpazure-python-sdk

Azure function app python v2 model - unclosed aiohttp session


I am writing an Azure Function App using Python (v2 programming model) that calls an API multiple times to get some data and write the responses to a blob storage. Those operations happen asyncronously, and I am using aiohttp to make the API calls. This is the code of my function app:

import azure.functions as func
from azure.storage.blob.aio import BlobServiceClient
import asyncio
import aiohttp
import datetime
import logging
import requests
import json
import sys
import os
import ssl
import certifi

_KV_SECRET_OUTPUT_BLOB_CONNECTION_STRING = os.environ["KV_SECRET_OUTPUT_BLOB_CONNECTION_STRING"]
_KV_SECRET_API_KEY = os.environ["KV_SECRET_API_KEY"]
CRON_SCHEDULE = os.environ["CRON_SCHEDULE"]
STORAGE_CONTAINER_NAME = os.environ["STORAGE_CONTAINER_NAME"]



def timestamp_utc():
    return datetime.datetime.now(datetime.timezone.utc)

def timestamp_for_logs():
    return timestamp_utc().isoformat()

def timestamp_for_blob_path():
    return timestamp_utc().strftime("%Y/%m/%d/T/%H/%M/%S_%Z")


async def makeAPIcall(api_endpoint, session, request_params):
    logging.info(f'attempting to make an API call... with request_params: {request_params}')
    try:
        async with session.get(api_endpoint, params = request_params) as response:
            response.raise_for_status()
            response_content = await response.read()
            logging.info("Response from the API call: "+str(response_content))
            return response_content
                
    except Exception as e:
        logging.error(f'Error calling API:')
        logging.error(str(sys.exc_info()))
        return None


async def writeResponseToBlob(data, container_name, blob_name, connection_string):
    logging.info(f'attempting to write to blob with container name: {container_name} and blob name: {blob_name}')
    try:
        # Create the BlobServiceClient object using the connection string
        blob_service_client =  BlobServiceClient.from_connection_string(connection_string)

        # Get a ContainerClient object for the specified container
        container_client = blob_service_client.get_container_client(container_name)

        # Create a blob in the container with the specified name
        blob_client = container_client.get_blob_client(blob_name)

        # Write the API response to the blob
        await blob_client.upload_blob(data, overwrite=True)

    except:
        logging.error("ERROR during blob upload : "+str(sys.exc_info()))
        sys.exit(1)


async def chain(base_url, satellite_id, request_params, context, container_name, session) -> None:
    # part A of the chain
    api_endpoint = f"{base_url}/{satellite_id}"
    response_content = await makeAPIcall(api_endpoint, session, request_params=request_params)
    
    # part B of the chain
    output_blob_name = timestamp_for_blob_path()+'_SatID_'+str(satellite_id)+'_InvocID_'+context.invocation_id+'.json'
    
    await writeResponseToBlob(data=response_content, 
                              container_name=container_name, 
                              blob_name=output_blob_name, 
                              connection_string=_KV_SECRET_OUTPUT_BLOB_CONNECTION_STRING)
    
    logging.info(f"Finished entire chain for SatID {satellite_id}")


async def main(args, session):

    base_url = args["base_url"]
    satellite_ids =  args["satellite_ids"]
    request_params =  args["request_params"]
    context = args["context"]
    container_name = args["container_name"]
    
    # version A
    # await asyncio.gather(*(chain(base_url, satellite_id, request_params, context, container_name, session) for satellite_id in satellite_ids))

    # version B
    for satellite_id in satellite_ids:
        await chain(base_url, satellite_id, request_params, context, container_name, session)



#===============
# CORE FUNCTION:
#===============

app = func.FunctionApp()
@app.function_name(name="sendReqToApiWriteToBlobTimeTrigger")
@app.schedule(schedule=CRON_SCHEDULE, arg_name="mytimer", run_on_startup=False, use_monitor=True)

async def eventHandler(mytimer: func.TimerRequest, context: func.Context) -> None:
    
    logging.info('ctx_func_name:\t'+context.function_name),
    logging.info('ctx_func_dir:\t'+context.function_directory),
    logging.info('ctx_invocation_id:\t'+context.invocation_id),

    logging.info("Cron schedule UTC:\t"+CRON_SCHEDULE)

    # satellite_ids = [25544, 25544]
    satellite_ids = ["Amsterdam", "London"]
    # base_url = f"https://api.wheretheiss.at/v1/satellites"
    base_url = f"https://worldtimeapi.org/api/timezone/Europe"

    request_params = {}

    if mytimer.past_due:
        logging.info('The timer is past due!')


    args = {
        "base_url": base_url,
        "request_params": request_params,
        "satellite_ids" : satellite_ids,
        "context" : context,
        "container_name" : STORAGE_CONTAINER_NAME,
    }

    ssl_context = ssl.create_default_context(cafile=certifi.where())
    async with aiohttp.TCPConnector(ssl=ssl_context) as connector:
        async with aiohttp.ClientSession(connector=connector) as session:
            await main(args, session)
    
    # ensure session is closed
    await asyncio.sleep(1)
    await session.close()
    await asyncio.sleep(1)
    await connector.close()
    logging.info(f"Entire function execution finished")

Now the problem is that, at least when I simulate locally in VSCode, the function gives an error (or warning actually because the debugging does not stop it) that the client session and connector are unclosed:

[2023-12-12T22:41:00.524Z] Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000001E9A492CCD0>
[2023-12-12T22:41:00.526Z] Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x000001E9A48D7520>, 982955.203)]']
connector: <aiohttp.connector.TCPConnector object at 0x000001E9A492C7F0>

I did follow the recommendation from aiohttp for graceful shotdown, as well as the recommendation to re-use the session, but strangely at every function execution, I make 2 requests to the API, and I seem to get an error at every request. What can be the issue, and how can I resolve the error?

EDIT 2024.01.15:

When I remove the await in this function:

async def writeResponseToBlob(data, container_name, blob_name, connection_string):
       ...
        # Write the API response to the blob
        # await blob_client.upload_blob(data, overwrite=True)
        blob_client.upload_blob(data, overwrite=True)

then the problem disappears, and instead I get a warning:

...\function_app.py: RuntimeWarning: coroutine 'BlobClient.upload_blob' was never awaited

But how to make sure that this function works with both await on api call, and on blob upload?


Solution

  • turns out the problem was with the writeResponseToBlob method and what was missing was async with for the blob service client.

    So this method now looks like this:

    async def writeResponseToBlob(data, connection_string, container_name, blob_name):
        logging.info(f'attempting to write to blob with container name: {container_name} and blob name: {blob_name} ...')
        try:
            async with BlobServiceClient.from_connection_string(connection_string) as blob_service_client:
                # Get a ContainerClient object for the specified container
                container_client = blob_service_client.get_container_client(container_name)
                # Write the API response to the blob
                blob_client = await container_client.upload_blob(name=blob_name, data=data, overwrite=True)
            return blob_client.blob_name
        except:
            logging.error("ERROR during blob upload : "+str(sys.exc_info()))
            return None
    

    and that fixed the issues with the unclosed aiohttp session/connector.