Search code examples
python-3.xpython-trio

How to gather results and using limit with parent child functions


What I'm trying to achieve is something along the lines of spawning multiple parents and with each parent do some work and then spawn a few childs to check for other things and grab those results in the parent to do further work. I was also trying to make 2 different limits of spawn because the parent work can do more than the childs.

How would I accomplish this?

It works if I don't use limit2 but I would like to have the two limiters.

import trio
import asks
import time
import random

async def child(parent, i, sender, limit2):
    async with limit2:
        print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
        #await trio.sleep(random.randrange(0, 3))
        print('Parent {0}, Child {1}: exiting!'.format(parent, i))
        async with sender:
            await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))

async def parent(i, limit):
    async with limit:
        print('Parent {0}: started! Sleeping now...'.format(i))
        #await trio.sleep(random.randrange(0, 3))

        sender, receiver = trio.open_memory_channel(10)
        limit2 = trio.CapacityLimiter(2)
        async with trio.open_nursery() as nursery:
            for j in range(10):
                nursery.start_soon(child, i, j, sender, limit2)

        async with receiver:
            async for value in receiver:
                print('Got value: {!r}'.format(value))
        print('Parent {0}: exiting!'.format(i))

async def main():
    limit = trio.CapacityLimiter(1)
    async with trio.open_nursery() as nursery:
        for i in range(1):
            nursery.start_soon(parent, i, limit)


if __name__ == "__main__":
    start_time = time.perf_counter()
    trio.run(main)
    duration = time.perf_counter() - start_time
    print("Took {:.2f} seconds".format(duration))

Solution

  • When I run your code, I get:

      File "/tmp/zigb.py", line 12, in child
        await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
      File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 157, in send
        self.send_nowait(value)
      File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_core/_ki.py", line 167, in wrapper
        return fn(*args, **kwargs)
      File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 135, in send_nowait
        raise trio.ClosedResourceError
    trio.ClosedResourceError
    

    What's happening here is that you're passing the sender channel into all 10 of the child tasks, and then each child task is doing async with sender: ..., which closes the sender channel. So first one task uses it, and then closes it, and then the next task tries to use it... but it's already closed, so it gets an error.

    Fortunately, Trio comes with a solution for exactly this problem: you can use the clone method on a memory channel object to create a second copy of that memory channel that works exactly the same, but gets closed independently. So the trick is to pass each of the children a clone of sender, and then they each close their clone, and then once all the clones are closed, the receiver gets notified and stops looping.

    Docs: https://trio.readthedocs.io/en/stable/reference-core.html#managing-multiple-producers-and-or-multiple-consumers

    Fixed version of your code:

    import trio
    import asks
    import time
    import random
    
    async def child(parent, i, sender, limit2):
        async with limit2:
            print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
            #await trio.sleep(random.randrange(0, 3))
            print('Parent {0}, Child {1}: exiting!'.format(parent, i))
            async with sender:
                await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
    
    async def parent(i, limit):
        async with limit:
            print('Parent {0}: started! Sleeping now...'.format(i))
            #await trio.sleep(random.randrange(0, 3))
    
            sender, receiver = trio.open_memory_channel(10)
            limit2 = trio.CapacityLimiter(2)
            async with trio.open_nursery() as nursery:
                for j in range(10):
                    # CHANGED: Give each child its own clone of 'sender', which
                    # it will close when it's done
                    nursery.start_soon(child, i, j, sender.clone(), limit2)
            # CHANGED: Close the original 'sender', once we're done making clones
            await sender.aclose()
    
            async with receiver:
                async for value in receiver:
                    print('Got value: {!r}'.format(value))
            print('Parent {0}: exiting!'.format(i))
    
    async def main():
        limit = trio.CapacityLimiter(1)
        async with trio.open_nursery() as nursery:
            for i in range(1):
                nursery.start_soon(parent, i, limit)
    
    
    if __name__ == "__main__":
        start_time = time.perf_counter()
        trio.run(main)
        duration = time.perf_counter() - start_time
        print("Took {:.2f} seconds".format(duration))