Search code examples
micropythonraspberry-pi-picothonny

uasyncio.create_task does not run coroutine (micropython)


Before I start an issue on github, just wanted to know, if I am doing smth wrong. This basic example should start a loop and after one calls the stop method it should stop. However, the print("while loop") gets not executed at all.

  • rp2040 Zero (Pico)
  • Thonny IDE
import uasyncio
import utime

class Test:
    def __init__(self):
        pass
    
    def run_loop(self):
        self.should_run = 1
        uasyncio.create_task(self._while_loop())

    async def _while_loop(self):
        print("while loop") # <--------- gets never called!
        counter = 0
        while self.should_run == 1:
            counter += 1
            print(counter)
            await uasyncio.sleep_ms(100)
    
    def stop(self):
        self.should_run = 0
        print("should stop now")

test = Test()
test.run_loop()
print("looop started.. wait..")
utime.sleep(3)
print("...waited..")
test.stop()

Solution

  • In a python script all executions are sync by default. To provide async functionality one has to create an async context. this is done via asyncio.run() // which will wait due to pythons nature till all inner async tasks are terminated.

    As the micropython-uasyncio docs says, a typical async application is wrapped into the async context (e.g. main function).

    This is how I solved the problem above:

    import uasyncio as asyncio
    import utime
    
    class Test:
        run = 0
        def __init__(self):
            pass
        
        def run_loop(self):
            self.run = 1
            asyncio.create_task(self._while_loop())
    
        async def _while_loop(self):
            print("while loop, should_run:", self.run)
            counter = 0
            while self.run == 1:
                counter += 1
                print(counter)
                await asyncio.sleep_ms(100)
    
        def stop(self):
            self.run = 0
            print("should stop now")
    
    async def main():
        test = Test()
        test.run_loop()
        print("looop started.. wait..")
        await asyncio.sleep(2)
        test.stop()
    
    asyncio.run(main())