Search code examples
pythonasynchronouspython-asynciogpiozero

Use asyncio with non asynchrone callback method from external library


I'm using the gpiozero python library to handle simple GPIO devices on a Raspberry Pi (I use here a MotionSensor for the example):

import asyncio
from gpiozero import MotionSensor


class MotionSensorHandler():
    __whenMotionCallback = None

    def __init__(self, pin, whenMotionCallback):
        # whenMotionCallback is an async function
        self.__whenMotionCallback = whenMotionCallback

        # Just init the sensor with gpiozero lib
        motionSensor = MotionSensor(pin)

        # Method to call when motion is detected
        motionSensor.when_motion = self.whenMotion

    async def whenMotion(self):
        await self.__whenMotionCallback()

My problem here is that I tried to give an async function has callback to motionSensor.when_motion.

So I get the error that whenMotion function is async but never await but I actually can't await it:

# will not work because MotionSensor() is not using asyncio
motionSensor.when_motion = await self.whenMotion

Do you have any idea how I can assign my async function to a none one ?


Solution

  • So after research I found that I have to create a new asyncio loop to execute asynchronous script in a no-asynchronous method. So now my whenMotion() method is no longer async but execute one using ensure_future().

    import asyncio
    from gpiozero import MotionSensor
    
    
    class MotionSensorHandler():
        __whenMotionCallback = None
    
        def __init__(self, pin, whenMotionCallback):
            # whenMotionCallback is an async function
            self.__whenMotionCallback = whenMotionCallback
    
            # Just init the sensor with gpiozero lib
            motionSensor = MotionSensor(pin)
    
            # Method to call when motion is detected
            motionSensor.when_motion = self.whenMotion
    
        def whenMotion(self):
            # Create new asyncio loop
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            future = asyncio.ensure_future(self.__executeWhenMotionCallback()) # Execute async method
            loop.run_until_complete(future)
            loop.close()
    
        async def __executeWhenMotionCallback(self):
            await self.__whenMotionCallback()