Search code examples
pythonasynchronousbeautifulsoupaiohttppython-asyncio

Python: How to use asyncio with huge csv-files to send asynchronous requests from loops?


I want to go through a huge list of urls and send requests to them asynchronously. Since the CSV-file with the urls is too big to load it at once, I would like to read the lines row by row, and each time the row is loaded, it should start a request and save the result to a file.

My problem is that if I understood it right when I use asyncio.gather all tasks have to be collected at once.

It would be great if you could tell me how to change my code, to get it to send asynchronous requests for each row of the csv file.

Here is the code I am stuck with:

import asyncio
import aiohttp
import async_timeout

import csv

async def fetch( session, url ):
    async with async_timeout.timeout(10):
        try:
            async with session.get(url) as response:
                return response
        except Exception as e:
            print(str( e ))
            return False

async def write_result( result ):
    with open( 'results.csv', 'a' ) as csv_file:
        writer = csv.writer( csv_file )
        writer.writerow( result )

async def validate_page( session, url ):
    response = await fetch( session, url )
    await write_result( response )

async def main():
    async with aiohttp.ClientSession() as session:
        with open('urls.csv') as csv_file:
            for row in csv.reader( csv_file ):
                await validate_page( session, row )

loop = asyncio.get_event_loop()
loop.run_until_complete(main())


Solution

  • To process each line from csv file asynchronously use the following approach.

    A bunch of optimizations and restructuring of your current approach:

    • no need to create csv.reader for csv input file if it contains only url on each separate line (just traverse though a file object)
    • no need to wrap with additional async with async_timeout.timeout(10) as aiohttp.ClientSession itself has timeout option
    • definitely no need to construct a new writer = csv.writer( csv_file ) for each processed url (with consequent result) - create writer object just once (ensuring graceful writing with asyncio.Lock - see below)

    import asyncio
    import aiohttp
    
    import csv
    
    async def fetch(session, url):
        try:
            async with session.get(url, timeout=10) as response:
                return await response.text()
        except Exception as e:
            print(url, str(e))
            return False
    
    
    async def write_result(result, writer):
        async with asyncio.Lock():   # lock for gracefully write to shared file object
            res = [<needed parts from result, >] # <- adjust a resulting list of strings
            writer.writerow(res)
    
    
    async def validate_page(session, url, writer):
        res = await fetch(session, url)
        if res:
            await write_result(res, writer)
    
    
    async def main():
        async with aiohttp.ClientSession() as session:
            with open('urls.csv') as csv_in, open('results.csv', 'a') as csv_out:
                writer = csv.writer(csv_out, delimiter=',')
                aws = [validate_page(session, url.strip(), writer) for url in csv_in]
                await asyncio.gather(*aws)
                print('!--- finished processing')
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())