I want to go through a huge list of urls and send requests to them asynchronously. Since the CSV-file with the urls is too big to load it at once, I would like to read the lines row by row, and each time the row is loaded, it should start a request and save the result to a file.
My problem is that if I understood it right when I use asyncio.gather
all tasks have to be collected at once.
It would be great if you could tell me how to change my code, to get it to send asynchronous requests for each row of the csv file.
Here is the code I am stuck with:
import asyncio
import aiohttp
import async_timeout
import csv
async def fetch( session, url ):
async with async_timeout.timeout(10):
try:
async with session.get(url) as response:
return response
except Exception as e:
print(str( e ))
return False
async def write_result( result ):
with open( 'results.csv', 'a' ) as csv_file:
writer = csv.writer( csv_file )
writer.writerow( result )
async def validate_page( session, url ):
response = await fetch( session, url )
await write_result( response )
async def main():
async with aiohttp.ClientSession() as session:
with open('urls.csv') as csv_file:
for row in csv.reader( csv_file ):
await validate_page( session, row )
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
To process each line from csv file asynchronously use the following approach.
A bunch of optimizations and restructuring of your current approach:
csv.reader
for csv input file if it contains only url on each separate line (just traverse though a file object)async with async_timeout.timeout(10)
as aiohttp.ClientSession
itself has timeout
optionwriter = csv.writer( csv_file )
for each processed url (with consequent result) - create writer
object just once (ensuring graceful writing with asyncio.Lock - see below)import asyncio
import aiohttp
import csv
async def fetch(session, url):
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(url, str(e))
return False
async def write_result(result, writer):
async with asyncio.Lock(): # lock for gracefully write to shared file object
res = [<needed parts from result, >] # <- adjust a resulting list of strings
writer.writerow(res)
async def validate_page(session, url, writer):
res = await fetch(session, url)
if res:
await write_result(res, writer)
async def main():
async with aiohttp.ClientSession() as session:
with open('urls.csv') as csv_in, open('results.csv', 'a') as csv_out:
writer = csv.writer(csv_out, delimiter=',')
aws = [validate_page(session, url.strip(), writer) for url in csv_in]
await asyncio.gather(*aws)
print('!--- finished processing')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())