Search code examples
pythonpython-3.xasynchronousconcurrencypython-trio

Python Trio set up a decimal number of workers


I'm working with trio to run asynchronous concurrent task that will do some web scraping on different websites. I'd like to be able to chose how many concurrent workers I'll divide the tasks with. To do so I've written this code

async def run_task():
    s = trio.Session(connections=5)
    Total_to_check = to_check() / int(module().workers)
    line = 0
    if int(Total_to_check) < 1:
        Total_to_check = 1
        module().workers = int(to_check())
    for i in range(int(Total_to_check)):
        try:
            async with trio.open_nursery() as nursery:
                for x in range(int(module().workers)):                  
                        nursery.start_soon(python_worker, self, s, x, line)
                        line += 1
                            
    
        except BlockingIOError as e:
            print("[Fatal Error]", str(e))
            continue            

In this example to_check() is equal to how many urls are given to fetch data from, and module().workers is equal to how many concurrent workers I'd like to use.

So if I had let's say I had 30 urls and I input that I want 10 concurrent tasks, it'll fetch data from 10 urls concurrently and repeat the procedure 3 times.

Now this is all well and good up until I the Total_to_check(which is equal to the number of urls divided by the number of workers) is in the decimals. If I have let's say 15 urls and I ask for 10 workers, then this code will only check 10 urls. Same if I've got 20 urls but ask for 15 workers. I could do something like math.ceil(Total_to_check) but then it'll start trying to check urls that don't exist.

How could I make this properly work, so that let's if I have 10 concurrent tasks and 15 urls, it'll check the first 10 concurrently and then the last 5 concurrently without skipping urls? (or trying to check too many)

Thanks!


Solution

  • Well, here comes the CapacityLimiter that you would use like this:

    async def python_worker(self, session, workers, line, limit):
        async with limit:
            ...
    

    Then you can simplify your run_task:

    async def run_task():
        limit = trio.CapacityLimiter(10)
        s = trio.Session(connections=5)
        line = 0
        async with trio.open_nursery() as nursery:
            for x in range(int(to_check())):
                nursery.start_soon(python_worker, self, s, x, line, limit)
                line += 1      
    

    I believe the BlockingIOError would have to move inside python_worker too because nursery.start_soon() won't block, it's the __aexit__ of the nursery that automagically waits at the end of the async with trio.open_nursery() as nursery block.