Search code examples
pythonasynchronousgoogle-apigoogle-api-python-client

Python parallel asynchronous processing of api calls from iterable


I'm creating an automation script for Google Workspace where it fetches an direct children of some organization unit and then fetches children of all these OUs at the same time. While searching the web for the answer whether multiprocessing, threading or asynchronous processing will work best for me I understood that asyncio is going to help me with this issue. I have created a class Google Tenant which holds the connection to google api and the fetched users. However, my problem right now is that the script still is not asynchronous but it works in sequence rather than making the calls asynchronously

from google.oauth2 import service_account
from googleapiclient.discovery import build

import logging
import asyncio

class GoogleTenant:
    def __init__(self, api: str, version: str):
        config: ScriptConfig = ScriptConfig()
        credentials = service_account.Credentials.from_service_account_file(config["gcloud"]["keypath"],
                                                                            scopes=SCOPES)
        delegated_credentials = credentials.with_subject(config["gcloud"]["subject"])
        self.service = build(api, version, credentials=delegated_credentials)
        self.users_list = []

    def fetch_users(self):
        users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute()
        ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]]
        asyncio.run(self._crawl_ous(ous_list))

    async def _crawl_ous(self, ous: list):
        crawling_result = await asyncio.gather(*[asyncio.create_task(self._fetch_users_from_ou(ou)) for ou in ous])
        for result in crawling_result:
            logging.info(f"Crawling result of ou {result[0]["organizations"][0]["department"]}: {len(result)}")
            self.users_list.extend(result)

    async def _fetch_users_from_ou(self, ou):
        call_parameters = {
            "customer": "my_customer",
            "maxResults": 500,
            "projection": "basic",
            "query": f"orgUnitPath='{str(ou)}'",
            "fields": "users/id,users/name,users/primaryEmail,users/suspended,users/emails,users/organizations/primary,users/organizations/department,users/recoveryEmail,nextPageToken"
        }
        logger.debug(f"Fetching users from {ou}")
        users_from_ou = self.service.users().list(**call_parameters).execute()
        user_fetching_result: list = users_from_ou["users"]
        logger.debug(f"Initial fetch from {ou}: {len(users_from_ou["users"])}")
        if "nextPageToken" in users_from_ou:
            next_page_token = users_from_ou["nextPageToken"]
        else:
            return user_fetching_result
        while True:
            users_from_ou = self.service.users().list(**call_parameters, pageToken=next_page_token).execute()
            logger.debug(f"Next fetch from {ou}: {len(users_from_ou["users"])}")
            user_fetching_result.extend(users_from_ou["users"])
            if "nextPageToken" in users_from_ou:
                next_page_token = users_from_ou["nextPageToken"]
            else:
                return user_fetching_result

if __name__ == '__main__':
    google_tenant = GoogleTenant("admin", "directory_v1")
    google_tenant.fetch_users()

The execution result of the following:

DEBUG:root:Fetching users from /example/child1
DEBUG:root:Initial fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 500
DEBUG:root:Next fetch from /example/child1: 258
DEBUG:root:Fetching users from /example/child2
DEBUG:root:Initial fetch from /example/child2: 500
DEBUG:root:Next fetch from /example/child2: 500
DEBUG:root:Next fetch from /example/child2: 500
...

I've tried to enter the await statement in places however I seem to misunderstand how it should as per my understanding the await statement makes the function wait for the result before continuing function execution. How can I make python execute these concurrently?

Update 1

I reformatted parts of the code as per @Michael Butscher suggestion and also added my imports in the previous block

    def fetch_users(self):
        users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute()
        ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]]
        logger.debug(f"Fetched and sanitized ous: {pprint.pformat(ous_list)}")
        asyncio.run(self._crawl_ous(ous_list))

    async def _crawl_ous(self, ous: list):
        tasks = [self._crawler_proxy(ou) for ou in ous]
        crawling_result = await asyncio.gather(*tasks)
        for result in crawling_result:
            logger.info(f"Crawling result: {len(result)}")
            self.users_list.extend(result)

    async  def _crawler_proxy(self, *args, **kwargs):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self._fetch_users_from_ou(*args, **kwargs))

Solution

  • Once again thank you @Juris and @Michael Butscher for the provided suggestions and help. In the end I used different approach. The simplest solution in this case turned out to be multiprocessing. I used the multiprocessing.Pool().map() function. Below is the modified code.

        def fetch_users(self) -> None:
            config = ScriptConfig()
            users_ous = self.service.orgunits().list(customerId="my_customer", orgUnitPath="/example", type="children", fields="organizationUnits/orgUnitPath").execute()
            ous_list = [ou["orgUnitPath"] for ou in users_ous["organizationUnits"]]
            logger.debug(f"Fetched and sanitized ous: {pprint.pformat(ous_list)}")
            self._crawl_ous(ous_list)
            logger.info(f"Fetched users: {len(self.users_list)}")
    
        def _crawl_ous(self, ous: list[str]) -> None:
            users_list = []
            with multiprocessing.Pool(processes=30) as pool:
                crawling_results = pool.map(self._fetch_users_from_ou, ous)
    
            for result in crawling_results:
                logger.info(f"Crawling result: {len(result)}")
                self.users_list.extend(result)
    
        def _fetch_users_from_ou(self, ou):
            call_parameters = {
                "customer": "my_customer",
                "maxResults": 500,
                "projection": "basic",
                "query": f"orgUnitPath='{str(ou)}'",
                "fields": "users/id,users/name,users/primaryEmail,users/suspended,users/emails,users/organizations/primary,users/organizations/department,users/recoveryEmail,nextPageToken"
            }
            logger.info(f"Fetching users from {ou}")
            users_from_ou: dict = self.service.users().list(**call_parameters).execute()
            logger.debug(f"Initial fetch from {ou}: {len(users_from_ou["users"])}")
            user_fetching_result: list = users_from_ou["users"]
            if "nextPageToken" in users_from_ou:
                next_page_token: str = users_from_ou["nextPageToken"]
            else:
                return user_fetching_result
            while True:
                users_from_ou: dict = self.service.users().list(**call_parameters, pageToken=next_page_token).execute()
                logger.debug(f"Next fetch from {ou}: {len(users_from_ou["users"])}")
                user_fetching_result.extend(users_from_ou["users"])
                if "nextPageToken" in users_from_ou:
                    next_page_token: str = users_from_ou["nextPageToken"]
                else:
                    return user_fetching_result