Search code examples
postgresqlazureazure-functionsmicrosoft-graph-apimicrosoft-entra-id

Azure Function App - frequent hang and deadlocks


I am writing a Azure Function App with Python SDK that has multiple Timer Triggers. Each of those timer triggers pull all users or groups from Microsoft Entra ID with a help of msgraph api and writes them to a Postgres DB also hosted on Azure. As I briefly described in title, my Function App gets frequent hang and deadlock errors displayed in "Resource & Health" tab (every 2-3 days once). We are on EP1 App Service Plan.

Here is a short code example of the failing Trigger:

  1. PostgreSQLConnector where I connect to my Postgres DB and insert:
class PostgreSQLConnector:
        def __init__(self):
            self.host = os.getenv("PG_HOST")
            self.dbname = os.getenv("PG_DB")
            self.user = os.getenv("PG_USER")
            self.password = os.getenv("PG_PASS")
            self.sslmode = "require"
    
       def __enter__(self):
            self.connect()
            self.cursor = self.connection.cursor()
            return self

       def __exit__(self, exc_type, exc_value, traceback):
            self.cursor.close()
            self.close_connection()
    
    def insert_many_into_table(self, table_name, columns, entries):
        sql_query = f"""
        INSERT INTO {table_name} ({', '.join([''.join(name) for name, _ in columns])})
        VALUES ({', '.join(['%s'] * len(columns))})
        """
        try:
            batch_size = 10000
            batches = [entries[i:i + batch_size] for i in range(0, len(entries), batch_size)]
            commit_interval = 10
            execute_many_counts = 0
            batch_counter = 0
            for i, batch in enumerate(batches):
                self.cursor.executemany(sql_query, batch)
                batch_counter += len(batch)
                execute_many_counts += 1
                if execute_many_counts % commit_interval == 0:
                    self.connection.commit()
            self.connection.commit()
        except Exception as e:
            self.connection.rollback()
            raise Exception(f"Error committing changes: {e}")
  1. A msgraph api class instance where I authenticate and pull all users from Entra:
class GraphHelper:
    client_credential: ClientSecretCredential
    app_client: GraphServiceClient

    def __init__(self, tenant_id, client_id, client_secret):
        client_id = client_id
        tenant_id = tenant_id
        client_secret = client_secret
        self.client_credential = ClientSecretCredential(
            tenant_id, client_id, client_secret
        )
        self.app_client = GraphServiceClient(self.client_credential)

    async def get_all_entra_users(self):
        request_configuration = UsersRequestBuilder.UsersRequestBuilderGetRequestConfiguration(
            query_parameters=UsersRequestBuilder.UsersRequestBuilderGetQueryParameters(
                select=self.default_user_select_properties,
                top=999
            )
        )
        all_users = []
        top_users = await self.app_client.users.get(request_configuration=request_configuration)
        for user in top_users.value:
            all_users.append(user)
        next_link = top_users.odata_next_link
        try:
            while next_link:
                top_users = await self.app_client.users.with_url(next_link).get(request_configuration=request_configuration)
                for user in top_users.value:
                    all_users.append(user)
                next_link = top_users.odata_next_link
        except Exception as e:
            logging.info(e)
        return all_users
  1. function_app.py trigger definition, pulling users from msgraph and writing them to PostgresDB
tenant_id = os.getenv("AZURE_TENANT_ID")
client_id = os.getenv("AZURE_CLIENT_ID")
client_secret = os.getenv("AZURE_CLIENT_SECRET")

graph_helper_instance = GraphHelper(tenant_id, client_id, client_secret)

@app.function_name(name="allEntraUsersTimerTrigger")
@app.timer_trigger(schedule="0 0 1 * * *", arg_name="allEntraUsersTimerTrigger", run_on_startup=False,
              use_monitor=False)
async def all_entra_users_timer_trigger(allEntraUsersTimerTrigger: func.TimerRequest) -> None:
    try:
        users = await graph_helper_instance.get_all_entra_users()
        user_entries = [tuple([getattr(user, key) for key, _ in SchemaDefinitions.columns_users]) for user in users]
        with PostgreSQLConnector() as postgres_helper_instance:
            postgres_helper_instance.drop_table(SchemaDefinitions.table_name_users)
            postgres_helper_instance.create_table(SchemaDefinitions.table_name_users, SchemaDefinitions.columns_users)
            postgres_helper_instance.insert_many_into_table(SchemaDefinitions.table_name_users, SchemaDefinitions.columns_users, user_entries)
    except Exception as e:
        # Error handling

There is around 50k users that I need to pull and write to DB so this quallifies as heavy I/O function I would assume. Running this locally works without problem. Manually triggering it by sending POST request on deployed function app also works without problem. The only problem is that it frequently hangs when it triggers. Is there something fundamentally wrong how I have setup my code that could cause that hangs and deadlocks when it automatically triggers? Could it be that Postgres writes are not async? Does anyone have an idea?

If you need any more information please let me know


Solution

  • The original implementation i.e. calling urls with msgraph with_url in loop was causing the memory usage to hit Azure Function host memory usage limit of 1.5GB:

    await self.app_client.users.with_url(next_link).get(request_configuration=request_configuration)
    

    I replaced the above implementation with the following async function:

    import aiohttp
    
    async def async_fetch(self, initial_url, expand_value=None, get_only_top=False, top=999, headers=None, consistent=False):
        token = self.get_access_token()
        headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }
        if consistent:
            headers["ConsistencyLevel"] = "eventual"
        result_list = []
        next_url = initial_url
        async with aiohttp.ClientSession() as client:
            while next_url:
                async with client.get(next_url, headers=headers) as response:
                    response_json = await response.json()
                    single_result = response_json.get('value', [])
                    result_list.extend(single_result)
                    next_url = response_json.get('@odata.nextLink')
        return result_list
    

    This function is triggered with:

    initial_url = f"https://graph.microsoft.com/v1.0/users?$top={top}&$select=id,deletedDateTime,accountEnabled,businessPhones,city"
    users = await self.async_fetch(initial_url)
    

    In addition to that, I am calling a clear memory function to speed up memory release:

    def clear_memory(function, locals):
        for var in list(locals.keys()):
            del var
        logging.info(f'{function}: memory cleared')
    

    This approach helps manage memory usage more effectively, preventing the Azure Function host from stopping the execution due to high memory consumption.

    Here is an image from metrics that shows the before and after

    memory usage