Search code examples
pythonpython-asyncioaiohttp

error using shared TCPConnector in aiohttp


I've been trying to use a connection pool in aiohttp without much luck. The use case is the code makes requests to a few servers repeatedly and I don't want to recreate the connections on every request. Here is some code to reproduce the problem (The error is Timeout context manager should be used inside a task):

import asyncio
import logging
from asyncio import Future
from typing import Dict, List

from aiohttp import ClientTimeout, ClientSession, TCPConnector


class UrlService:
    def __init__(self):
        self.connector: TCPConnector = TCPConnector()

    async def _fetch(self, session:ClientSession, url:str):
        logging.info('requesting data from %s', url)
        async with session.get(url) as response:
            data = await response.text()
            logging.info('received data from %s', url)
            if response.status != 200:
                text = await response.text()
                return f'non 200 status for {url}: {text.strip()}'

            return data

    async def _make_requests(self, urls: List[str]) -> Future:

        async with ClientSession(timeout=ClientTimeout(10),
                                 connector=self.connector,
                                 connector_owner=False) as session:
            coroutines = []
            for url in urls:
                coroutines.append(self._fetch(session, url))

            return await asyncio.gather(*coroutines)

    def download(self, urls: List[str]) -> Dict[str, Dict]:
        responses = asyncio.run(self._make_requests(urls))
        return dict(zip(urls, responses))

if __name__ == '__main__':
    url_service = UrlService()
    search_urls = ['https://google.com', 'https://yahoo.com', 'https://bing.com']
    data = url_service.download(search_urls)

    for url, resp in data.items():
        print(f'****** {url} ******')
        print(f'  resp len: {len(resp)}')

Solution

  • Based on this issue from aiobotocore, I think the issue is that you are creating the TCPConnector() outside of any pre-existing event loop context. Internally, TCPConnector will get it's own event loop when it's constructed, and end up associated to that. Then, when you do asyncio.run(), you end up with another event loop instance that's actually used to run _make_requests, which then tries to use the TCPConnector associated to a different event loop. This mismatch seems to be the cause of the error.

    When I move self.connector = TCPConnector() line into the body of _make_requests(), the issue goes away.