Search code examples
pythonpython-asyncio

Python asyncio not able to run the tasks


I am trying to test python asyncio and aiohttp. Idea is to fetch the data from API parallely and store the .html file in a local drive. Below is my code.

import asyncio
import aiohttp
import time
import os

url_i = "<some_urls>-"
file_path = "<local_drive>\\asynciotest"

async def download_pep(pep_number: int) -> bytes:
  url = url + f"{pep_number}/"
  print(f"Begin downloading {url}")
  async with aiohttp.ClientSession() as session:
    async with session.get(url) as resp:
      content = await resp.read()
      print(f"Finished downloading {url}")
      return content
    
async def write_to_file(pep_number: int, content: bytes) -> None:
  with open(os.path.join(file_path,f"{pep_number}"+'-async.html'), "wb") as pep_file:
    print(f"{pep_number}_Begin writing ")
    pep_file.write(content)
    print(f"Finished writing")

async def web_scrape_task(pep_number: int) -> None:
  content = await download_pep(pep_number)
  await write_to_file(pep_number, content)

async def main() -> None:
  tasks = []
  for i in range(8010, 8016):
    tasks.append(web_scrape_task(i))
  await asyncio.wait(tasks)

if __name__ == "__main__": 
  s = time.perf_counter()
  asyncio.run(main())     
  elapsed = time.perf_counter() - s
  print(f"Execution time: {elapsed:0.2f} seconds.")

The above code is throwing an error

TypeError: Passing coroutines is forbidden, use tasks explicitly.
sys:1: RuntimeWarning: coroutine 'web_scrape_task' was never awaited

I am completely novish in asyncio hence not getting any clue. I have followed documentation but have not got any clue.

Am I missing here things?

Edit

I am trying to call APIs sequentially with each concurrent / parallel call. For this I am using asyncio.Semaphore() and restricting the concurrency into 2. I got the clue from here and from the comments below.

I have made the revision in the code below:

async def web_scrape_task(pep_number: int) -> None:
  for i in range(8010, 8016):
    content = await download_pep(i)
    await write_to_file(pep_number, content)
    
##To limit concurrent call 2##
sem = asyncio.Semaphore(2)

async def main() -> None:
  tasks = []
  for i in range(8010, 8016):
    async with sem:
        tasks.append(asyncio.create_task(web_scrape_task(i)))
  await asyncio.gather(*tasks)

if __name__ == "__main__": 
  s = time.perf_counter()
  asyncio.run(main())
  #await main()
  elapsed = time.perf_counter() - s
  print(f"Execution time: {elapsed:0.2f} seconds.")

Now the question is whether this is the correct approach?


Solution

  • The error occurs because you're passing raw coroutines to asyncio.wait() instead of scheduling them as tasks. All you have to do is wrap your web_scrape_task() call inside main() with asyncio.create_task(), like so:

    async def main() -> None:
        tasks = []
        for i in range(8010, 8016):
            tasks.append(asyncio.create_task(web_scrape_task(i)))
        await asyncio.wait(tasks)
    

    that way the coroutine is converted into an asyncio task and is correctly awaited.

    Hope this helps :)

    EDIT: Full Code

    w/ concurrent method calls & a single aiohttp Client Session

    import asyncio
    import aiohttp
    import time
    import os
    
    url_i = "<some_urls>-"
    file_path = "<local_drive>\\asynciotest"
    
    async def download_pep(session: aiohttp.ClientSession, pep_number: int) -> bytes:
        url = url_i + f"{pep_number}/"
        print(f"Begin downloading {url}")
        async with session.get(url) as resp:
            content = await resp.read()
        print(f"Finished downloading {url}")
        return content
    
    async def write_to_file(pep_number: int, content: bytes) -> None:
      with open(os.path.join(file_path,f"{pep_number}"+'-async.html'), "wb") as pep_file:
        print(f"{pep_number}_Begin writing ")
        pep_file.write(content)
        print(f"Finished writing")
    
    async def web_scrape_task(session: aiohttp.ClientSession, pep_number: int) -> None:
        content = await download_pep(session, pep_number)
        await write_to_file(pep_number, content)
    
    async def main() -> None:
        async with aiohttp.ClientSession() as session:
            tasks = [web_scrape_task(session, i) for i in range(8010, 8016)]
            await asyncio.gather(*tasks)
    
    if __name__ == "__main__": 
        s = time.perf_counter()
        asyncio.run(main())
        elapsed = time.perf_counter() - s
        print(f"Execution time: {elapsed:0.2f} seconds.")
    

    EDIT 2: Semaphore Handling

    Small modification for limiting the concurrency (is that a word?) to two.

    async def web_scrape_task(session: aiohttp.ClientSession, pep_number: int, semaphore: asyncio.Semaphore) -> None: # take in semaphore as a prop
        async with semaphore:  # synchronise with the semaphore here
            content = await download_pep(session, pep_number)
            await write_to_file(pep_number, content)
    
    async def main() -> None:
        semaphore = asyncio.Semaphore(2) # initialise a semaphore
        async with aiohttp.ClientSession() as session:
            tasks = [web_scrape_task(session, i, semaphore) for i in range(8010, 8016)] # pass semaphore as a prop
            await asyncio.gather(*tasks)