What I'm trying to achieve is something along the lines of spawning multiple parents and with each parent do some work and then spawn a few childs to check for other things and grab those results in the parent to do further work. I was also trying to make 2 different limits of spawn because the parent work can do more than the childs.
How would I accomplish this?
It works if I don't use limit2 but I would like to have the two limiters.
import trio
import asks
import time
import random
async def child(parent, i, sender, limit2):
async with limit2:
print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
#await trio.sleep(random.randrange(0, 3))
print('Parent {0}, Child {1}: exiting!'.format(parent, i))
async with sender:
await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
async def parent(i, limit):
async with limit:
print('Parent {0}: started! Sleeping now...'.format(i))
#await trio.sleep(random.randrange(0, 3))
sender, receiver = trio.open_memory_channel(10)
limit2 = trio.CapacityLimiter(2)
async with trio.open_nursery() as nursery:
for j in range(10):
nursery.start_soon(child, i, j, sender, limit2)
async with receiver:
async for value in receiver:
print('Got value: {!r}'.format(value))
print('Parent {0}: exiting!'.format(i))
async def main():
limit = trio.CapacityLimiter(1)
async with trio.open_nursery() as nursery:
for i in range(1):
nursery.start_soon(parent, i, limit)
if __name__ == "__main__":
start_time = time.perf_counter()
trio.run(main)
duration = time.perf_counter() - start_time
print("Took {:.2f} seconds".format(duration))
When I run your code, I get:
File "/tmp/zigb.py", line 12, in child
await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 157, in send
self.send_nowait(value)
File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_core/_ki.py", line 167, in wrapper
return fn(*args, **kwargs)
File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 135, in send_nowait
raise trio.ClosedResourceError
trio.ClosedResourceError
What's happening here is that you're passing the sender
channel into all 10 of the child tasks, and then each child task is doing async with sender: ...
, which closes the sender
channel. So first one task uses it, and then closes it, and then the next task tries to use it... but it's already closed, so it gets an error.
Fortunately, Trio comes with a solution for exactly this problem: you can use the clone
method on a memory channel object to create a second copy of that memory channel that works exactly the same, but gets closed independently. So the trick is to pass each of the children a clone of sender
, and then they each close their clone, and then once all the clones are closed, the receiver gets notified and stops looping.
Fixed version of your code:
import trio
import asks
import time
import random
async def child(parent, i, sender, limit2):
async with limit2:
print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
#await trio.sleep(random.randrange(0, 3))
print('Parent {0}, Child {1}: exiting!'.format(parent, i))
async with sender:
await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
async def parent(i, limit):
async with limit:
print('Parent {0}: started! Sleeping now...'.format(i))
#await trio.sleep(random.randrange(0, 3))
sender, receiver = trio.open_memory_channel(10)
limit2 = trio.CapacityLimiter(2)
async with trio.open_nursery() as nursery:
for j in range(10):
# CHANGED: Give each child its own clone of 'sender', which
# it will close when it's done
nursery.start_soon(child, i, j, sender.clone(), limit2)
# CHANGED: Close the original 'sender', once we're done making clones
await sender.aclose()
async with receiver:
async for value in receiver:
print('Got value: {!r}'.format(value))
print('Parent {0}: exiting!'.format(i))
async def main():
limit = trio.CapacityLimiter(1)
async with trio.open_nursery() as nursery:
for i in range(1):
nursery.start_soon(parent, i, limit)
if __name__ == "__main__":
start_time = time.perf_counter()
trio.run(main)
duration = time.perf_counter() - start_time
print("Took {:.2f} seconds".format(duration))