Due to my question here, I have to monitor a file-descriptor in the background.
I have tried this:
import gpiod
import asyncio
import select
async def read():
name = "GPOUT"
line = chip.get_line(INPUT)
line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
fd = line.event_get_fd()
poll = select.poll()
poll.register(fd)
loop = asyncio.get_event_loop()
while True:
await loop.run_in_executor(None, poll.poll, None)
event = line.event_read()
print(event)
def main():
asyncio.run(read())
while True:
input = input("Enter something: ")
print(input)
main()
The execution is blocked at call off poll.poll
. I have also tried this:
import gpiod
import asyncio
def readCb(line):
print(f"{line.consumer}: {line.event_read()}")
def read():
name = "GPOUT"
line = chip.get_line(INPUT)
line.request(consumer=name, type=gpiod.LINE_REQ_EV_BOTH_EDGES, flags=gpiod.LINE_REQ_FLAG_BIAS_PULL_UP)
fd = line.event_get_fd()
loop = asyncio.get_event_loop()
loop.add_reader(fd, readCb, line)
loop.run_forever()
def main():
read()
while True:
input = input("Enter something: ")
print(input)
main()
Also blocks in the read function.
For the second example, I also tried to make read()
async
and call it with asyncio.run()
but then the following error occurs:
File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
raise RuntimeError('This event loop is already running')
Is there any possibility to do this in python, besides real threads and subprocesses?
Many thanks for your help and best regards,
Cone
Despite what you might think from the name, poll.poll
blocks until one of the registered fd
s is ready. It does not poll.
Reading events from a line with libgpiod
is blocking if no event is present.
So if you don't want it to block then check the there is an event available first - by testing that the fd
is readable.
If the other code that you want to run is also I/O bound on an fd
then you can have the poll
wait for events from either. That is traditional async style.
If your other code does not block on an fd
then you have two options, threading or polling. With threading you run the libgpiod
calls in one Python thread and your other code in another. With polling, you periodically check the line to see if an edge event has occurred. Polling is generally less efficient than threading, but if you consider Python threads to be "real threads" then the solution you are left with is polling.