Search code examples
pythonpython-3.xasynchronouspython-trio

Combining semaphore and time limiting in python-trio with asks http request


I'm trying to use Python in an async manner in order to speed up my requests to a server. The server has a slow response time (often several seconds, but also sometimes faster than a second), but works well in parallel. I have no access to this server and can't change anything about it. So, I have a big list of URLs (in the code below, pages) which I know beforehand, and want to speed up their loading by making NO_TASKS=5 requests at a time. On the other hand, I don't want to overload the server, so I want a minimum pause between every request of 1 second (i. e. a limit of 1 request per second).

So far I have successfully implemented the semaphore part (five requests at a time) using a Trio queue.

import asks
import time
import trio

NO_TASKS = 5


asks.init('trio')
asks_session = asks.Session()
queue = trio.Queue(NO_TASKS)
next_request_at = 0
results = []


pages = [
    'https://www.yahoo.com/',
    'http://www.cnn.com',
    'http://www.python.org',
    'http://www.jython.org',
    'http://www.pypy.org',
    'http://www.perl.org',
    'http://www.cisco.com',
    'http://www.facebook.com',
    'http://www.twitter.com',
    'http://www.macrumors.com/',
    'http://arstechnica.com/',
    'http://www.reuters.com/',
    'http://abcnews.go.com/',
    'http://www.cnbc.com/',
]


async def async_load_page(url):
    global next_request_at
    sleep = next_request_at
    next_request_at = max(trio.current_time() + 1, next_request_at)
    await trio.sleep_until(sleep)
    next_request_at = max(trio.current_time() + 1, next_request_at)
    print('start loading page {} at {} seconds'.format(url, trio.current_time()))
    req = await asks_session.get(url)
    results.append(req.text)


async def producer(url):
    await queue.put(url)  


async def consumer():
    while True:
        if queue.empty():
            print('queue empty')
            return
        url = await queue.get()
        await async_load_page(url)


async def main():
    async with trio.open_nursery() as nursery:
        for page in pages:
            nursery.start_soon(producer, page)
        await trio.sleep(0.2)
        for _ in range(NO_TASKS):
            nursery.start_soon(consumer)


start = time.time()
trio.run(main)

However, I'm missing the implementation of the limiting part, i. e. the implementation of max. 1 request per second. You can see above my attempt to do so (first five lines of async_load_page), but as you can see when you execute the code, this is not working:

start loading page http://www.reuters.com/ at 58097.12261669573 seconds
start loading page http://www.python.org at 58098.12367392373 seconds
start loading page http://www.pypy.org at 58098.12380622773 seconds
start loading page http://www.macrumors.com/ at 58098.12389389973 seconds
start loading page http://www.cisco.com at 58098.12397854373 seconds
start loading page http://arstechnica.com/ at 58098.12405119873 seconds
start loading page http://www.facebook.com at 58099.12458010273 seconds
start loading page http://www.twitter.com at 58099.37738939873 seconds
start loading page http://www.perl.org at 58100.37830828273 seconds
start loading page http://www.cnbc.com/ at 58100.91712723473 seconds
start loading page http://abcnews.go.com/ at 58101.91770178373 seconds
start loading page http://www.jython.org at 58102.91875295573 seconds
start loading page https://www.yahoo.com/ at 58103.91993155273 seconds
start loading page http://www.cnn.com at 58104.48031027673 seconds
queue empty
queue empty
queue empty
queue empty
queue empty

I've spent some time searching for answers but couldn't find any.


Solution

  • One of the ways to achieve your goal would be using a mutex acquired by a worker before sending a request and released in a separate task after some interval:

    async def fetch_urls(urls: Iterator, responses, n_workers, throttle):
        # Using binary `trio.Semaphore` to be able
        # to release it from a separate task.
        mutex = trio.Semaphore(1)
    
        async def tick():
            await trio.sleep(throttle)
            mutex.release()
    
        async def worker():
            for url in urls:
                await mutex.acquire()
                nursery.start_soon(tick)
                response = await asks.get(url)
                responses.append(response)
    
        async with trio.open_nursery() as nursery:
            for _ in range(n_workers):
                nursery.start_soon(worker)
    

    If a worker gets response sooner than after throttle seconds, it will block on await mutex.acquire(). Otherwise the mutex will be released by the tick and another worker will be able to acquire it.

    This is similar to how leaky bucket algorithm works:

    • Workers waiting for the mutex are like water in a bucket.
    • Each tick is like a bucket leaking at a constant rate.

    If you add a bit of logging just before sending a request you should get an output similar to this:

       0.00169 started
      0.001821 n_workers: 5
      0.001833 throttle: 1
      0.002152 fetching https://httpbin.org/delay/4
         1.012 fetching https://httpbin.org/delay/2
         2.014 fetching https://httpbin.org/delay/2
         3.017 fetching https://httpbin.org/delay/3
          4.02 fetching https://httpbin.org/delay/0
         5.022 fetching https://httpbin.org/delay/2
         6.024 fetching https://httpbin.org/delay/2
         7.026 fetching https://httpbin.org/delay/3
         8.029 fetching https://httpbin.org/delay/0
         9.031 fetching https://httpbin.org/delay/0
         10.61 finished