Search code examples
pythonconcurrencyaiohttp

limit concurreny and control requests per minute with python aiohttp?


There's a game named guild wars 2 and it gives us APIs to query almost everything in the game database. My aim is using python asyncio and aiohttp to write a simple crawler and get all the items' info from guild wars 2 game database.

I write a short program, it's work, but it behaves kind of weird, I guess here's something I don't understand about composing the coroutine.

First, I made a request with the Postman app. And, in the response header, there's X-Rate-Limit-Limit, 600. So I guess requests are limited at 600 per minute?

here's my question.

1、After the program finished. I checked some JSON file and they have the same content

[{"name": "Endless Fractal Challenge Mote Tonic", "description": "Transform into a Challenge Mote for 15 minutes or until hit. You cannot move while transformed."......

which means the request got a bad response, but I don't know why.

2、I tried asyncio.Semaphore, but even I limit concurrency at 5, the request goes beyond 600 very soon. So I tried to control time by add a time.sleep(0.2) at the end of request_item function. I guess the time.sleep(0.2) will suspend the whole python process for 0.2 seconds, and actually, it worked, but after executing for some time the program hangs for a long time and then gave out a lot of failed attempts. Every automatic retry still failed. I'm confused about this behavior.

async def request_item(session, item_id):
    req_param_item = req_param
    req_param_item['ids'] = item_id
    # retry for 3 times when exception occurs.
    for i in range(3):
        try:
            async with session.get(url_template, params=req_param_item) as response:
                result = await response.json()
                with open(f'item_info/{item_id}.json', 'w') as f:
                    json.dump(result, f)
                print(item_id, 'done')
            break
        except Exception as e:
            print(item_id, i, 'failed')
            continue
    time.sleep(0.2)

When I move time.sleep(0.2) into for loop inside request_item function, the whole program hangs. I have no idea what was happening.

async def request_item(session, item_id):
    req_param_item = req_param
    req_param_item['ids'] = item_id
    for i in range(3):
        try:
            time.sleep(0.2)
            async with session.get(url_template, params=req_param_item) as response:
                result = await response.json()
                with open(f'item_info/{item_id}.json', 'w') as f:
                    json.dump(result, f)
                print(item_id, 'done')
            break
        except Exception as e:
            print(item_id, i, 'failed')
            continue

could anyone explain this a little? And is there a better solution? I thought there are some solutions, but I can't test it. like, get the loop.time(), and suspend the whole event loop for every 600 requests. Or, add 600 requests to task_list and gather them as a group, after it's done, asyncio.run(get_item(req_ids)) again with another 600 requests.

here's all of my code.

import aiohttp
import asyncio
import httpx
import json
import math
import os
import time

tk = 'xxxxxxxx'
url_template = 'https://api.guildwars2.com/v2/items'

# get items list
req_param = {'access_token': tk}
item_list_resp = httpx.get(url_template, params=req_param)
items = item_list_resp.json()

async def request_item(session, item_id):
    req_param_item = req_param
    req_param_item['ids'] = item_id
    for i in range(3):
        try:
            async with session.get(url_template, params=req_param_item) as response:
                result = await response.json()
                with open(f'item_info/{item_id}.json', 'w') as f:
                    json.dump(result, f)
                print(item_id, 'done')
            break
        except Exception as e:
            print(item_id, i, 'failed')
            continue
    # since the game API limit requests, I think it's ok to suspend program for a while
    time.sleep(0.2)

async def get_item(item_ids: list):
    task_list = []
    async with aiohttp.ClientSession() as session:
        for item_id in item_ids:
            req = request_item(session, item_id)
            task = asyncio.create_task(req)
            task_list.append(task) 
        await asyncio.gather(*task_list)

asyncio.run(get_item(req_ids))

Solution

  • You are using time.sleep() instead of await asyncio.sleep(). It's block hole execution for N seconds and do it in a wrong place.

    Here is what happen. When you run

    for item_id in item_ids:
       req = request_item(session, item_id)
       task = asyncio.create_task(req)
       task_list.append(task)
    

    You just schedule your request, but not running its. (eg. you have 1000 item_ids) So you schedule 1000 tasks and when you run await asyncio.gather(*task_list) your actually wait for all this 1000 tasks will be executed. They will fire at once.

    But inside each task you run time.sleep(0.2) and you have to wait 1000*0.2 secs. Remember all tasks run at once and in general in a random order. So you run task 1 and wait 0.2 sec, then fire task 2 and you wait 0.2 sec, then task 999 fire and wait 0.2 sec and so on.

    Simplest solution will be wait for minute after firing 600 requests. You need slow down inside get_item. Example code (I do not test it):

    async def get_item(item_ids: list):
        task_list = []
        async with aiohttp.ClientSession() as session:
            for n, item_id in enumerate(item_ids):
                req = request_item(session, item_id)
                task = asyncio.create_task(req)
                task_list.append(task)
                if n % 600 == 0:
                    await asyncio.gather(*task_list)
                    await asyncio.sleep(60)
                    task_list = []
    

    I recommend you to use a library asyncio-throttle.

    PS. With rate limit 600 per minute I do not think you need asyncio, because I am pretty sure that 600 concurrent requests will be executed in a 5-10 sec. Check twice is your 600 request takes more than 1 minute with classic requests with threads.