Search code examples
mqttstate-machinepytransitions

How can I perform continuous / repetitive tasks in an AsyncMachine without ending in a deadlock?


I'm trying to set up an asynchronous state machine using transitions that can be controlled via MQTT using aiomqtt. I managed to get a minimum working example running that works if there are no repetitive actions:

Script for the state machine:

import asyncio
import aiomqtt
from transitions.extensions import AsyncMachine
import sys
import os
import logging
logging.basicConfig(level=logging.DEBUG)

if sys.platform.lower() == "win32" or os.name.lower() == "nt":
    from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
    set_event_loop_policy(WindowsSelectorEventLoopPolicy())

class MQTTStateMachine:
    states = ['init','A','B',{'name': 'stopped', 'final':True}]

    def __init__(self,client):
        self.client = client

        self.machine = AsyncMachine(model=self, states=MQTTStateMachine.states, initial='init')
        self.machine.add_transition(trigger='init', source='init', dest='A')
        self.machine.add_transition(trigger='stop', source=['A','B'], dest='stopped')

    async def update_state(self):
        await self.client.publish("MQTTstatemachine/machine/state", str(self.state))
    
    async def receiveMQTT(self):
        await self.client.subscribe("MQTTstatemachine/controller/transition")
        async for message in self.client.messages:
            if message.topic.matches("MQTTstatemachine/controller/transition"):
                await self.trigger(message.payload.decode())

    async def on_enter_A(self):
        await self.update_state()
        print("I'm now in state A.")
    
    async def on_enter_B(self):
        await self.update_state()
        print("I'm now in state B.")
    
    async def on_enter_stopped(self):
        await self.update_state()
        print("I'm now in state stopped.")

async def main():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        MQTTmachine = MQTTStateMachine(client)
        await MQTTmachine.init()
        await asyncio.create_task(MQTTmachine.receiveMQTT())

if __name__ == "__main__":
    asyncio.run(main())

Controller script:

import asyncio
import aiomqtt
import sys
import os
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
    from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
    set_event_loop_policy(WindowsSelectorEventLoopPolicy())

async def publishTransitions(client):
        await asyncio.sleep(5)
        await client.publish("MQTTstatemachine/controller/transition","to_B")
        print("Transition: to_B")
        await asyncio.sleep(5)
        await client.publish("MQTTstatemachine/controller/transition","to_A")
        print("Transition: to_A")
        await asyncio.sleep(5)
        await client.publish("MQTTstatemachine/controller/transition","to_B")
        print("Transition: to_B")
        await asyncio.sleep(5)
        await client.publish("MQTTstatemachine/controller/transition","stop")
        print("Transition: stop")
        await asyncio.sleep(5)

async def receiveStates(client):
        await client.subscribe("MQTTstatemachine/machine/state")
        async for message in client.messages:
                if message.topic.matches("MQTTstatemachine/machine/state"):
                        print(f"Statemachine now in state {message.payload.decode()}")

async def main():
        async with aiomqtt.Client("test.mosquitto.org") as client:
                tasks = [publishTransitions(client),receiveStates(client)]
                pending = [asyncio.create_task(t) for t in tasks]
                done, pending = await asyncio.wait(pending,return_when=asyncio.FIRST_COMPLETED)
              
                pendingTask = pending.pop()
                pendingTask.cancel()
                try:
                        await pendingTask
                except asyncio.CancelledError:
                        print(f"Finsihed.")

if __name__ == "__main__":
    asyncio.run(main())

I tried doing some repetitive action by replacing on_enter_B:

    async def on_enter_B(self):
        while self.is_B():
            await self.update_state()
            print("I'm now in state B.")
            await asyncio.sleep(1)

but then it gets stuck in state B and doesn't respond to state changes via MQTT anymore.

I tried implementing the repetitive task with a reflexive transition but that doesn't work either:

    async def on_enter_B(self):
        await self.update_state()
        print("I'm now in state B.")
        await asyncio.sleep(1)
        await self.to_B()

Solution

  • As far as I can tell the problem is here:

        async def receiveMQTT(self):
            await self.client.subscribe("MQTTstatemachine/controller/transition")
            async for message in self.client.messages:
                if message.topic.matches("MQTTstatemachine/controller/transition"):
                    await self.trigger(message.payload.decode())  # [1]
    

    self.trigger in [1] will not return when any callback blocks and thus the for loop never reaches the next element and receiveMQTT blocks itself.

    Solution: Don't await trigger but make it a task [2]. Keep track of running tasks [3] to prevent tasks being stopped by the garbage collector (see the python documentation) for details. When a new trigger arrives AsyncMachine should cancel the running trigger. This will call the done callback which you can use to remove references from your task list [4]. In my example self.task is a set (see [5]).

        def __init__(self, client):
            self.client = client
            self.tasks = set()  # [5]
    
    # ...
    
        async def receiveMQTT(self):
            await self.client.subscribe("MQTTstatemachine/controller/transition")
            async for message in self.client.messages:
                if message.topic.matches("MQTTstatemachine/controller/transition"):
                    task = asyncio.create_task(self.trigger(message.payload.decode()))  # [2]
                    self.tasks.add(task)  # [3]
                    task.add_done_callback(self.tasks.discard)  # [4]
    

    This, however, may delay state transitions which means that when your mqtt client returns the actual transition might not be done yet. So right after client.publish("...", "to_B") the state could still be A.

    There might be a way to tell aiomqtt to not wait for receiveMQTT to return instead.