Search code examples
pythonchannelconsumerproducerpython-trio

Python and Trio, where producers are consumers, how to exit gracefully when the job is done?


I'm trying to make a simple web crawler using trio an asks. I use nursery to start a couple of crawlers at once, and memory channel to maintain a list of urls to visit.

Each crawler receives clones of both ends of that channel, so they can grab a url (via receive_channel), read it, find and add new urls to be visited (via send_channel).

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())


async def crawler(send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        content = await ...
        urls_found = ...
        for u in urls_found:
            await send_channel.send(u)  # I'm a producer too!

In this scenario the consumers are the producers. How to stop everything gracefully?

The conditions for shutting everything down are:

  • channel is empty
  • AND
  • all crawlers are stuck at the first for loop, waiting for the url to appear in receive_channel (which... won't happen anymore)

I tried with async with send_channel inside crawler() but could not find a good way to do it. I also tried to find some different approach (some memory-channel-bound worker pool, etc), no luck here as well.


Solution

  • There are at least two problem here.

    Firstly is your assumption about stopping when the channel is empty. Since you allocate the memory channel with a size of 0, it will always be empty. You are only able to hand off a url, if a crawler is ready to receive it.

    This creates problem number two. If you ever find more urls than you have allocated crawlers, your application will deadlock.

    The reason is, that since you wont be able to hand off all your found urls to a crawler, the crawler will never be ready to receive a new url to crawl, because it is stuck waiting for another crawler to take one of its urls.

    This gets even worse, because assuming one of the other crawlers find new urls, they too will get stuck behind the crawler that is already waiting to hand off its urls and they will never be able to take one of the urls that are waiting to be processed.

    Relevant portion of the documentation:

    https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels

    Assuming we fix that, where to go next?

    You probably need to keep a list (set?) of all visited urls, to make sure you dont visit them again.

    To actually figure out when to stop, instead of closing the channels, it is probably a lot easier to simply cancel the nursery.

    Lets say we modify the main loop like this:

    async def main():
        send_channel, receive_channel = trio.open_memory_channel(math.inf)
        active_workers = trio.CapacityLimiter(3) # Number of workers
        async with trio.open_nursery() as nursery:
            async with send_channel, receive_channel:
                nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
                nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
                nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
                while True:
                    await trio.sleep(1) # Give the workers a chance to start up.
                    if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
                        nursery.cancel_scope.cancel() # All done!
    

    Now we need to modify the crawlers slightly, to pick up a token when active.

    async def crawler(active_workers, send_channel, receive_channel):
        async for url in receive_channel:  # I'm a consumer!
            with active_workers:
                content = await ...
                urls_found = ...
                for u in urls_found:
                    await send_channel.send(u)  # I'm a producer too!
    
    

    Other things to consider -

    You may want to use send_channel.send_noblock(u) in the crawler. Since you have an unbounded buffer, there is no chance of a WouldBlock exception, and the behaviour of not having a checkpoint trigger on every send might be desireable. That way you know for sure, that a particular url is fully processed and all new urls have been added, before other tasks get a chance to grab a new url, or the parent task get a chance to check if work is done.