Search code examples
pythonpython-3.xpython-asyncioaiohttp

semaphore/multiple pool locks in asyncio for 1 proxy - aiohttp


I have 5,00,000 urls. and want to get response of each asynchronously.

import aiohttp
import asyncio    

@asyncio.coroutine
def worker(url):
    response = yield from aiohttp.request('GET', url, connector=aiohttp.TCPConnector(share_cookies=True, verify_ssl=False))
    body = yield from response.read_and_close()

    print(url)

def main():
    url_list = [] # lacs of urls, extracting from a file

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([worker(u) for u in url_list]))

main()

I want 200 connections at a time(concurrent 200), not more than this because

when I run this program for 50 urls it works fine, i.e url_list[:50] but if I pass whole list, i get this error

aiohttp.errors.ClientOSError: Cannot connect to host www.example.com:443 ssl:True Future/Task exception was never retrieved future: Task()

may be frequency is too much and server is denying to respond after a limit?


Solution

  • Yes, one can expect a server to stop responding after causing too much traffic (whatever the definition of "too much traffic") to it.

    One way to limit number of concurrent requests (throttle them) in such cases is to use asyncio.Semaphore, similar in use to these used in multithreading: just like there, you create a semaphore and make sure the operation you want to throttle is aquiring that semaphore prior to doing actual work and releasing it afterwards.

    For your convenience, asyncio.Semaphore implements context manager to make it even easier.

    Most basic approach:

    CONCURRENT_REQUESTS = 200
    
    
    @asyncio.coroutine
    def worker(url, semaphore):
        # Aquiring/releasing semaphore using context manager.
        with (yield from semaphore):
            response = yield from aiohttp.request(
                'GET',
                url,
                connector=aiohttp.TCPConnector(share_cookies=True,
                                               verify_ssl=False))
            body = yield from response.read_and_close()
    
            print(url)
    
    
    def main():
        url_list = [] # lacs of urls, extracting from a file
    
        semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS)
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait([worker(u, semaphore) for u in url_list]))