Search code examples
pythonasynchronousaiohttp

RuntimeError Session is closed when trying to make async requests


First of all heres the code:

import random
import asyncio
from aiohttp import ClientSession
import csv

headers =[]

def extractsites(file):
    sites = []
    readfile = open(file, "r")
    reader = csv.reader(readfile, delimiter=",")
    raw = list(reader)
    for a in raw:
        sites.append((a[1]))
    return sites


async def fetchheaders(url, session):
    async with session.get(url) as response:
        responseheader = await response.headers
        print(responseheader)
        return responseheader


async def bound_fetch(sem, url, session):
    async with sem:
        print("doing request for "+ url)
        await fetchheaders(url, session)


async def run():
    urls = extractsites("cisco-umbrella.csv")
    tasks = []
    # create instance of Semaphore
    sem = asyncio.Semaphore(100)
    async with ClientSession() as session:
        for i in urls:
            task = asyncio.ensure_future(bound_fetch(sem, "http://"+i, session))
            tasks.append(task)
        return tasks

def main():
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(run())
    loop.run_until_complete(future)

if __name__ == '__main__':
    main()

Most of this code was taken from this blog post: https://pawelmhm.github.io/asyncio/python/aiohttp/2016/04/22/asyncio-aiohttp.html

Here is my problem that I'm facing: I am trying to read a million urls from a file and then make async request for each of them. But when I try to execute the code above I get the Session expired error.

This is my line of thought: I am relatively new to async programming so bear with me. My though process was to create a long task list (that only allows 100 parallel requests), that I build in the run function, and then pass as a future to the event loop to execute.

I have included a print debug in the bound_fetch (which I copied from the blog post) and it looks like it loops over all urls that I have and as soon as it should start making requests in the fetchheaders function I get the runtime errors.

How do I fix my code ?


Solution

  • A couple things here.

    First, in your run function you actually want to gather the tasks there and await them to fix your session issue, like so:

    async def run():
        urls = ['google.com','amazon.com']
        tasks = []
        # create instance of Semaphore
        sem = asyncio.Semaphore(100)
        async with ClientSession() as session:
            for i in urls:
                task = asyncio.ensure_future(bound_fetch(sem, "http://"+i, session))
                tasks.append(task)
            await asyncio.gather(*tasks)
    

    Second, the aiohttp API is a little odd in dealing with headers in that you can't await them. I worked around this by awaiting body so that headers are populated and then returning the headers:

    async def fetchheaders(url, session):
        async with session.get(url) as response:
            data = await response.read()
            responseheader = response.headers
            print(responseheader)
            return responseheader
    

    There is some additional overhead here in pulling the body however. I couldn't find another way to load headers though without doing a body read.