Search code examples
pythonasynchronousrequestpython-asyncioaiohttp

Asyncio lock not working when fetching data from API asynchronously


My function below is not working as intended. It is an async function, and I want it so that when the response is a 401 and the lock is not locked, it removes the first value in api_keys. When the lock is locked, I want it to retry, as api_keys is currently being modified. However, the output prints multiple status code 401s in a row, but my code should change the current api key when a 401 is returned and should work from there.

api_keys = []
lock = asyncio.Lock()

async def fetch_data(session, url, params, delay):
    await asyncio.sleep(delay)

    global api_keys
    global lock

    while True:
        if lock.locked():
            print("Lock is locked, continuing...")
            continue

        os.environ["API_KEY"] = api_keys[0]
        params["api_key"] = os.getenv("API_KEY")

        async with session.get(url, params=params) as response:
            if response.status == 200:
                data = await response.json()
                print("Remaining requests", response.headers["x-requests-remaining"])
                print("Used requests", response.headers["x-requests-used"])
                return data
            else:
                print(
                    f"Failed to get odds: status_code {response.status}, response body {await response.text()}".rstrip()
                )
                if response.status == 401:
                    async with lock:
                        if len(api_keys) > 1:
                            print("Using next API key in list")
                            api_keys.pop(0)
                            continue
                        else:
                            print("No more API keys available")
                            return None

                return None

I first tried without the if lock.locked() line, and also moving the async with lock line around the while loop, and just under the loop, to no avail.


Solution

  • while True: if condition: continue could create a busy loop - that's bad already. What is even worse, the condition cannot change in such loop (in asyncio unlike multi-threading). Once entered, it will be infinite.

    The lock for api_keys because it might be currently modified does not make big sense in asyncio where (again unlike multi-threading), task switch cannot occur anywhere.

    If I understand the logic correctly, an api_key is being re-used many times up to the first 401 error and then should be retired. In the current program if multiple tasks would get that 401 error, all of them would delete the first key from the list, not only the key to be retired.

    My proposal is to make a central key storage. This would solve problem #1: task cannot continue without a key and also problem #2: a key must be retired only once. Below is a stub without implementation.

    There is no information in your question how new keys are being obtained, I left that out.

    async def get_key():
        """
        Return the first API key from the key list.
        Wait while the key list is empty (asyncio.Event suggested).
    
        Do:   key = await get_key()   before each API transaction.
        """
    
    def retire_key(key):
        """
        Remove this key from the beginning of the key list.
        Do nothing if the key is not there. Presumably it has
        been removed by some other key user.
    
        Do:   retire_key(key)    after a 401 error
        """