I want to use kqueue to monitor files for changes. I can see how to use select.kqueue() in a threaded way.
I'm searching for a way to use it with asyncio. I may have missed something really obvious here. I know that python uses kqueue for asyncio on macos. I'm happy for any solution to only work when kqueue selector is used.
So far the only way I can see to do this is create a thread to continually kqueue.control()
from another thread and then inject the events in with asyncio.loop.call_soon_threadsafe()
. I feel like there should be a better way.
You can add the FD from the kqueue objet as a reader to the control loop using loop.add_reader(). The control loop will then inform you events are ready to collect.
There's two features of doing this which might be odd to those familiar with kqueue:
asyncio.wait_for()
There are more efficient ways to write this, but here's an example of how to completely replace select.kqueue.control
with an async method (here named kqueue_control
):
async def kqueue_control(kqueue: select.kqueue,
changes: Optional[Iterable[select.kevent]],
max_events: int,
timeout: Optional[int]):
def receive_result():
try:
# Events are ready to collect; fetch them but do not block
results = kqueue.control(None, max_events, 0)
except Exception as ex:
future.set_exception(ex)
else:
future.set_result(results)
finally:
loop.remove_reader(kqueue.fileno())
# If this call is non-blocking then just execute it
if timeout == 0 or max_events == 0:
return kqueue.control(changes, max_events, 0)
# Apply the changes, but DON'T wait for events
kqueue.control(changes, 0)
loop = asyncio.get_running_loop()
future = loop.create_future()
loop.add_reader(kqueue.fileno(), receive_result)
if timeout is None:
return await future
else:
return await asyncio.wait_for(future, timeout)