Search code examples
pythonmacospython-asynciokqueue

How to use kqueue for file monitoring in asyncio?


I want to use kqueue to monitor files for changes. I can see how to use select.kqueue() in a threaded way.

I'm searching for a way to use it with asyncio. I may have missed something really obvious here. I know that python uses kqueue for asyncio on macos. I'm happy for any solution to only work when kqueue selector is used.

So far the only way I can see to do this is create a thread to continually kqueue.control() from another thread and then inject the events in with asyncio.loop.call_soon_threadsafe(). I feel like there should be a better way.


Solution

  • You can add the FD from the kqueue objet as a reader to the control loop using loop.add_reader(). The control loop will then inform you events are ready to collect.

    There's two features of doing this which might be odd to those familiar with kqueue:

    • select.kqueue.control is a one-shot method which first changes the monitor and waits for new events to arrive. Because we don't ever want it to block, the two actions must be split into one non-blocking call to modify the monitor and a second, later, non-blocking call to collect the resulting events.
    • Because we don't ever want to block, the timeout can never be used. This can be re-implemented with asyncio.wait_for()

    There are more efficient ways to write this, but here's an example of how to completely replace select.kqueue.control with an async method (here named kqueue_control):

    async def kqueue_control(kqueue: select.kqueue,
                             changes: Optional[Iterable[select.kevent]],
                             max_events: int,
                             timeout: Optional[int]):
    
        def receive_result():
            try:
                # Events are ready to collect; fetch them but do not block
                results = kqueue.control(None, max_events, 0)
            except Exception as ex:
                future.set_exception(ex)
            else:
                future.set_result(results)
            finally:
                loop.remove_reader(kqueue.fileno())
                
        # If this call is non-blocking then just execute it
        if timeout == 0 or max_events == 0:
            return kqueue.control(changes, max_events, 0)
        
        # Apply the changes, but DON'T wait for events
        kqueue.control(changes, 0)
        loop = asyncio.get_running_loop()
        future = loop.create_future()
        loop.add_reader(kqueue.fileno(), receive_result)
        if timeout is None:
            return await future
        else:
            return await asyncio.wait_for(future, timeout)