Search code examples
pythontransition

How do I use timeout in transitions to exit a state and stop further execution of on_enter_state function


I'm trying to build what I hope is a fairly simple state machine to control a program attached to an LCD display and push button. I have a callback called buttonPressedCallback to transition between states and I'd like a timeout to halt the current LCD display message after a given time if the button is not pressed. I thought I had it figured out, but it doesn't seem to respond as I expect. The button callback i haven't played with really, but just while experimenting with the timeout functionality I noticed that my code quit a lot sooner than expected. I want to use this as a much more sophisticated state machine for other projects so I need to get the basics right.

Here is my code:

from transitions import Machine
from transitions.extensions.states import add_state_features, Timeout
from gpiozero import Button
import time

BUTTON_PIN = 18


@add_state_features(Timeout)
class CustomStateMachine(Machine):
    pass


class simpleMachine(object):

    states = [{'name': 'dummy', 'timeout': 5, 'on_timeout': 'timeoutTransition'},
              {'name': 'start', 'timeout': 5, 'on_timeout': 'timeoutTransition'},
              'waiting']

    def __init__(self, button):

        self.button = button

        self.machine = CustomStateMachine(model=self, states=simpleMachine.states, initial='dummy')
        self.machine.add_transition(trigger='buttonPressCallback', source='start', dest='waiting')
        self.machine.add_transition(trigger='buttonPressCallback', source='waiting', dest='start')
        self.machine.add_transition('timeoutTransition', '*', 'waiting')

        self.button.when_pressed = self.buttonPressCallback

    def on_enter_start(self):
        self.printState()
        print("doing 'things' for 15 secs, timeout should happen first")
        time.sleep(15)
        print("Start state time.sleep() ended")
        print("Spent %s seconds in start state" % (time.time() - start_time))

    def on_enter_dummy(self):
        self.printState()

    def on_enter_waiting(self):
        self.printState()
        print("Nothing happens here, just waiting")
        while True:
            time.sleep(1)
        print("Waiting state time.sleep() ended")

    def printState(self):
        print("Entered state {}".format(self.state))


if __name__ == "__main__":

    start_time = time.time()

    btn = Button(pin=BUTTON_PIN, bounce_time=0.1)

    testMachine = simpleMachine(btn)

    print("State Machine started")

    testMachine.to_start()

    print("Program ran for %s seconds" % (time.time() - start_time))

Here is what I expect to happen:

  1. testMachine starts in "dummy" state
  2. testMachine is explicitly moved into "start" state with the call to testMachine.to_start()
  3. start state is doing "something" for 15 seconds but timeout fires after 5 seconds and calls the timeoutTransition
  4. timeout transition moves testMachine to the waiting state
  5. waiting continues indefinitely, waiting for the button press to fire the transition that would move it back to the start state

What actually happens:

(.env) dietpi@DietPi:~/rgb_clock$ sudo -E .env/bin/python test.py
State Machine started
Entered state start
doing 'things' for 15 secs, timeout should happen first
Entered state waiting
Nothing happens here, just waiting
Start state time.sleep() ended
Spent 15.149317979812622 seconds in start state
Program ran for 15.153512001037598 seconds
(.env) dietpi@DietPi:~/rgb_clock$ ```

I'm expecting this is something to do with asyncio and threading but I had hoped that transitions and timeout would take care of that for me.

Any thoughts on why it's not performing as expected very welcome and advise on how to actually implement the functionality I'm looking for (still using transitions as I hope to use this for a more complicated project that would be very hard to track/read with lots of if/else/while statements.


Solution

  • Basically, everything you described is actually happening. I guess what confuses you is that your implicit step "3a" (callback on_enter_start is canceled and the main thread stops sleeping) isn't happening. Additionally, the the fact that timeout threads are daemon threads lead to the second problem which is that your program just exits when on_enter_start is done.

    I modified your example a bit and use DEBUG logging to get all steps that are actually happening here. transitions uses logging quite extensively. So it's a good idea to turn on logging if things don't work as expected. For productive execution INFO is usually enough.

    from transitions import Machine
    from transitions.extensions.states import add_state_features, Timeout
    import time
    import logging
    
    
    @add_state_features(Timeout)
    class CustomStateMachine(Machine):
        pass
    
    
    class SimpleMachine(object):
    
        states = [{'name': 'dummy', 'timeout': 5, 'on_timeout': 'timeoutTransition'},
                  {'name': 'start', 'timeout': 5, 'on_timeout': 'timeoutTransition'},
                  'waiting']
    
        def __init__(self):
            self.machine = CustomStateMachine(model=self, states=SimpleMachine.states, initial='dummy')
            self.machine.add_transition('timeoutTransition', '*', 'waiting')
    
        def on_enter_start(self):
            print("doing 'things' for 15 secs, timeout should happen first")
            time.sleep(15)
            print("Start state time.sleep() ended")
            print("Spent %s seconds in start state" % (time.time() - start_time))
    
        def on_enter_waiting(self):
            print("Nothing happens here, just waiting")
            while True:
                time.sleep(1)
            print("Waiting state time.sleep() ended")
    
    
    if __name__ == "__main__":
        logging.basicConfig(level=logging.DEBUG)
        start_time = time.time()
        test_machine = SimpleMachine()
        print("State Machine started")
        test_machine.to_start()
        print("Program ran for %s seconds" % (time.time() - start_time))
        assert test_machine.state == 'waiting'
    

    Logging output:

    State Machine started
    doing 'things' for 15 secs, timeout should happen first
    DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
    DEBUG:transitions.core:Initiating transition from state dummy to state start...
    DEBUG:transitions.core:Executed callbacks before conditions.
    DEBUG:transitions.core:Executed callback before transition.
    DEBUG:transitions.core:Exiting state dummy. Processing callbacks...
    INFO:transitions.core:Exited state dummy
    DEBUG:transitions.core:Entering state start. Processing callbacks...
    # This is where on_enter_start is called and will block due to time.sleep
    DEBUG:transitions.extensions.states:Timeout state start. Processing callbacks...
    # The next event is the timeout be triggered (in a Thread!) and timeout callbacks
    # will be processed (timeoutTransition)
    DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
    DEBUG:transitions.core:Initiating transition from state start to state waiting...
    DEBUG:transitions.core:Executed callbacks before conditions.
    DEBUG:transitions.core:Executed callback before transition.
    DEBUG:transitions.core:Exiting state start. Processing callbacks...
    # state start is left! 
    INFO:transitions.core:Exited state start
    DEBUG:transitions.core:Entering state waiting. Processing callbacks...
    # state waiting is entered. Your callback on_enter_waiting will be executed in
    # the Timeout thread and block there
    Nothing happens here, just waiting
    Start state time.sleep() ended
    Spent 15.001700162887573 seconds in start state
    # in your main thread your on_enter_start callback is now done
    Program ran for 15.001909732818604 seconds
    INFO:transitions.core:Executed callback 'on_enter_start'
    INFO:transitions.core:Entered state start
    DEBUG:transitions.core:Executed callback after transition.
    DEBUG:transitions.core:Executed machine finalize callbacks
    # The program will exit since timeout threads are daemon threads.
    # the reason is that waiting timeouts do not block a program's exit
    Process finished with exit code 0
    

    So, how to deal with this. Right now, I can think of three different attempts:

    1. Organize your heavy processing in threads

    • make sure callbacks do not block event processing of the state machine
    • run heavy processing in threads/a single working thread.
    • suggestion: make your processing threads check for a flag regurlarly, so they can gracefully exit when they should be exited

    If your sensor readings block permantently and you cannot prevent it. You could try to use multiprocessing to kill callback without the need of a flag to check...

    from transitions import Machine
    from transitions.extensions.states import add_state_features, Timeout
    import time
    import logging
    import threading
    
    
    @add_state_features(Timeout)
    class CustomStateMachine(Machine):
        pass
    
    
    class SimpleMachine(object):
    
        states = [{'name': 'dummy', 'timeout': 5, 'on_timeout': 'timeoutTransition'},
                  {'name': 'start', 'timeout': 5, 'on_timeout': 'timeoutTransition'},
                  'waiting']
    
        def __init__(self):
            self.running = False  # our flag which will tell threads whether they should exit
            self.current_job = None  # where we save the running thread for joining
            self.machine = CustomStateMachine(model=self, states=SimpleMachine.states, initial='dummy')
            self.machine.add_transition('timeoutTransition', '*', 'waiting')
    
        def change_jobs(self, func):
            if self.current_job:
                self.running = False
                self.current_job.join()  # wait until job and thread exits
            self.running = True
            self.current_job = threading.Thread(target=func)
            self.current_job.daemon = False  # depends on your use case
            self.current_job.start()
    
        def on_enter_start(self):
            self.change_jobs(self.do_start_things)
    
        def do_start_things(self):
            print("doing 'things' for 15 secs, timeout should happen first")
            counter = 0
            start_time = time.time()
            while self.running and counter < 15:
                print("work work")
                time.sleep(1)
                counter += 1
            print("Spent %s seconds in start state" % (time.time() - start_time))
    
        def waiting(self):
            while self.running:
                print("wait for input")
                time.sleep(1)
    
        def on_enter_waiting(self):
            self.change_jobs(self.waiting)
    
    
    if __name__ == "__main__":
        #logging.basicConfig(level=logging.DEBUG)
        test_machine = SimpleMachine()
        print("State Machine started")
        test_machine.to_start()
        while True:
            time.sleep(1)  # make sure your main thread isnt exiting
    

    2. Do work in 'internal' transitions (e.g. before callbacks) and trigger a 'heartbeat' events

    What happens in heartbeat depends on the current state. In my opinion this will result in a cleaner experience than a having to rely on threads. Important: DO NOT BLOCK in callbacks but timeout for instance stale reading operations.

    from transitions import Machine
    from transitions.extensions.states import add_state_features, Timeout
    import time
    import logging
    
    
    @add_state_features(Timeout)
    class CustomStateMachine(Machine):
        pass
    
    
    class SimpleMachine(object):
    
        states = [{'name': 'dummy', 'timeout': 5, 'on_timeout': 'timeoutTransition'},
                  {'name': 'start', 'timeout': 5, 'on_timeout': 'timeoutTransition'},
                  'waiting']
    
        def __init__(self):
            self.running = False
            self.current_job = None
            self.machine = CustomStateMachine(model=self, states=SimpleMachine.states, initial='dummy')
            self.machine.add_transition('timeoutTransition', '*', 'waiting')
            self.machine.add_transition(trigger='tick', source='start', dest=None, before='start_tick')
            self.machine.add_transition(trigger='tick', source='waiting', dest=None, before='waiting_tick')
    
        def start_tick(self):
            print("work work")
    
        def waiting_tick(self):
            print("wait for input")
    
    
    if __name__ == "__main__":
        #logging.basicConfig(level=logging.DEBUG)
        test_machine = SimpleMachine()
        print("State Machine started")
        test_machine.to_start()
        while True:
            time.sleep(1)
            test_machine.tick()
    

    Use AsyncMachine to cancel tasks when transitionting away from states

    asyncio.wait_for will explicitely cancel a task when a timeout happens. If you have a collection of tasks running in state, AsyncMachine will cancel them when changing states, even without asyncio timeouts. This requires transitions > 0.8 and Python > 3.7. Note that AsyncMachine is a rather new addition to transitions.

    from transitions.extensions.asyncio import AsyncMachine
    import asyncio
    import logging
    
    
    class SimpleMachine(object):
    
        states = ['dummy', 'start', 'waiting']
    
        def __init__(self):
            self.machine = AsyncMachine(model=self, states=SimpleMachine.states, initial='dummy')
            self.machine.add_transition('run', 'dummy', 'start')
            self.machine.add_transition('timeoutTransition', '*', 'waiting')
    
        async def doing_things(self):
            while True:
                print("work work")
                await asyncio.sleep(1)
    
        async def on_enter_start(self):
            try:
                await asyncio.wait_for(self.doing_things(), 5)
            except asyncio.TimeoutError:
                print("Timeout!")
                await self.timeoutTransition()
    
        async def on_enter_waiting(self):
            while True:
                print("wait for input")
                await asyncio.sleep(1)
    
    
    if __name__ == "__main__":
        # logging.basicConfig(level=logging.DEBUG)
        test_machine = SimpleMachine()
        print("State Machine started")
        asyncio.get_event_loop().run_until_complete(test_machine.run())