Search code examples
python-3.xlinuxfile-descriptor

Monitor a file-descriptor in background with python


Due to my question here, I have to monitor a file-descriptor in the background.

I have tried this:

import gpiod
import asyncio
import select

async def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    fd = line.event_get_fd()
    poll = select.poll()
    poll.register(fd)
    loop = asyncio.get_event_loop()
    while True:
        await loop.run_in_executor(None, poll.poll, None)
        event = line.event_read()
        print(event)

def main():
    asyncio.run(read())
    while True:
        input = input("Enter something: ")
        print(input)

main()

The execution is blocked at call off poll.poll. I have also tried this:

import gpiod
import asyncio

def readCb(line):
    print(f"{line.consumer}: {line.event_read()}")

def read():
    name = "GPOUT"
    line = chip.get_line(INPUT)
    line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
    fd = line.event_get_fd()
    loop = asyncio.get_event_loop()
    loop.add_reader(fd, readCb, line)
    loop.run_forever()

def main():
    read()
    while True:
        input = input("Enter something: ")
        print(input)

main()

Also blocks in the read function.

For the second example, I also tried to make read() async and call it with asyncio.run() but then the following error occurs:

  File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
    raise RuntimeError('This event loop is already running')

Is there any possibility to do this in python, besides real threads and subprocesses?

Many thanks for your help and best regards,

Cone


Solution

  • Despite what you might think from the name, poll.poll blocks until one of the registered fds is ready. It does not poll.

    Reading events from a line with libgpiod is blocking if no event is present. So if you don't want it to block then check the there is an event available first - by testing that the fd is readable.

    If the other code that you want to run is also I/O bound on an fd then you can have the poll wait for events from either. That is traditional async style.

    If your other code does not block on an fd then you have two options, threading or polling. With threading you run the libgpiod calls in one Python thread and your other code in another. With polling, you periodically check the line to see if an edge event has occurred. Polling is generally less efficient than threading, but if you consider Python threads to be "real threads" then the solution you are left with is polling.