Search code examples
pythonweb-scrapingpython-asynciopython-requests-html

Limiting number of concurrent AsyncIO tasks using Semaphore not working


Objective:

I am trying to scrape multiple URLs simultaneously. I don't want to make too many requests at the same time so I am using this solution to limit it.

Problem:

Requests are being made for ALL tasks instead of for a limited number at a time.

Stripped-down Code:

async def download_all_product_information():
    # TO LIMIT THE NUMBER OF CONCURRENT REQUESTS
    async def gather_with_concurrency(n, *tasks):
        semaphore = asyncio.Semaphore(n)

        async def sem_task(task):
            async with semaphore:
                return await task

        return await asyncio.gather(*(sem_task(task) for task in tasks))

    # FUNCTION TO ACTUALLY DOWNLOAD INFO
    async def get_product_information(url_to_append):
        url = 'https://www.amazon.com.br' + url_to_append

        print('Product Information - Page ' + str(current_page_number) + ' for category ' + str(
            category_index) + '/' + str(len(all_categories)) + ' in ' + gender)

        source = await get_source_code_or_content(url, should_render_javascript=True)
        time.sleep(random.uniform(2, 5))

        return source

    # LOOP WHERE STUFF GETS DONE
    for current_page_number in range(1, 401):
        for gender in os.listdir(base_folder):
                all_tasks = []

                # check all products in the current page
                all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
                for product_specific_url in all_products_in_current_page:
                    current_task = asyncio.create_task(get_product_information(product_specific_url))

                    all_tasks.append(current_task)

                await gather_with_concurrency(random.randrange(8, 15), *all_tasks)

async def main():
    await download_all_product_information()

# just to make sure there are not any problems caused by two event loops
if asyncio.get_event_loop().is_running():  # only patch if needed (i.e. running in Notebook, Spyder, etc)
    import nest_asyncio

    nest_asyncio.apply()

# for asynchronous functionality
if __name__ == '__main__':
    asyncio.run(main())

What am I doing wrong? Thanks!


Solution

  • What is wrong is this line:

    current_task = asyncio.create_task(get_product_information(product_specific_url))
    

    When you create a "task" it is imediatelly scheduled for execution. As soon as your code yield execution to the asyncio loop (at any "await" expression), asyncio will loop executing all your tasks.

    The semaphore, in the original snippet you pointed too, guarded the creation of the tasks itself, ensuring only "n" tasks would be active at a time. What is passed in to gather_with_concurrency in that snippet are co-routines.

    Co-routines, unlike tasks, are objects that are ready to be awaited, but are not yet scheduled. They canbe passed around for free, just like any other object - they will only be executed when they are either awaited, or wrapped by a task (and then when the code passes control to the asyncio loop).

    In your code, you are creating the co-routine, with the get_product_information call, and immediately wrapping it in a task. In the await instruction in the line that calls gather_with_concurrency itself, they are all run at once.

    The fix is simple: do not create a task at this point, just inside the code guarded by your semaphore. Add just the raw co-routines to your list:

    ...
    all_coroutines = []
    # check all products in the current page
    all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
    for product_specific_url in all_products_in_current_page:
         current_coroutine = get_product_information(product_specific_url)
    
         all_coroutines.append(current_coroutine)
    
         await gather_with_concurrency(random.randrange(8, 15), *all_coroutines)
    

    There is still an unrelated incorrectness in this code that will make concurrency fail: you are making a synchronous call to time.sleepinside gather_product_information. This will stall the asyncio loop at this point until the sleep is over. The correct thing to do is to use await asyncio.sleep(...) .