I have a file descriptor, and I would like to read from it with multiple tasks. Each read() request on the fd is going to return a full, independent packet of data (as long as data is available).
My naive implementation was to have each worker run the following loop:
async def work_loop(fd):
while True:
await trio.hazmat.wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)
Unfortunately, this does not work because trio raises ResourceBusyError
if multiple tasks are blocking on the same fd. So my next iteration was to write a custom wait function:
async def work_loop(fd):
while True:
await my_wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)
where
read_queue = trio.hazmat.ParkingLot()
async def my_wait_readable():
if name is None:
name = trio.hazmat.current_task().name
while True:
try:
log.debug('%s: Waiting for fd to become readable...', name)
await trio.hazmat.wait_readable(fd)
except trio.ResourceBusyError:
log.debug('%s: Resource busy, parking in read queue.', name)
await read_queue.park()
continue
log.debug('%s: fd readable, unparking next task.', name)
read_queue.unpark()
break
However, in tests I get og messages like these:
2018-09-18 13:09:17.219 pyfuse3-worker-37: Waiting for fd to become readable...
2018-09-18 13:09:17.219 pyfuse3-worker-47: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-53: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-51: fd readable, unparking next task.
2018-09-18 13:09:17.220 pyfuse3-worker-51: doing work
2018-09-18 13:09:17.221 pyfuse3-worker-47: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-37: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-53: Resource busy, parking in read queue.
In other words:
trio.hazmat.wait_readable
What's the proper way to solve this problem?
Multiple readers from the same fd don't make sense, using Trio (or not) doesn't change that basic fact. Why are you trying to do that in the first place?
If for some reason you really do need parallel multiple tasks to post-process your data, use one read task to add the data to a queue and let your processing tasks get their data from that.
Alternately, you could use a lock:
read_lock = trio.Lock()
async def work_loop(fd):
while True:
async with read_lock:
await trio.hazmat.wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)