Search code examples
pythonstate-machinepyside2pytransitions

How to properly combine PySide2 and pytransitions for implementing a state machine for GUI application


Background: I'd like to implement a GUI for controlling a bunch of clients (that talk to 'servers' controlling hardware like motors, cameras etc. via RPC calls) using PySide2.

Previous approach: Typically, what I'd do is to create my GUI and connect the UI signals to the Client slots and the other way round. This works perfectly fine for simpler applications.

Problem: I would like my GUI to represent allowed calls to the clients properly. The most simple example: after executing client1.doXY() I'd like to disable the button that executed that command and reactivate it only after doZY() is completed. While this is totally possible with the approach above it feels wrong when things get more complicated: e.g. when GUI elements depend on the state of multiple clients.

Approach : I therefore thought I'd be a good idea to use finite state machines as an intermediate layer between the clients and the GUI and came across pytransitions, which looks very promising. However, I'm struggling finding the right way combining those two worlds.

Questions:

  • Is this generally speaking a valid design approach to have such a layer ?

  • In particular as shown in the working code example, I have to move the client to a separate thread to avoid the GUI freeze while the client is performing a blocking call. While my code works fine, it requires some overhead in creating additional qt signals to connect the ClientState and the Client object. Can this be done more elegantly (i.e. no additional xy_requested signal, but somehow a direct call from the ClientState to the Client functions that is still calls the Client function in the Client thread and not the main thread ?

Working example:

running state machine

Code:

import io
import logging
from time import sleep

import numpy as np
from PySide2 import QtSvg, QtWidgets
from PySide2.QtCore import Signal, Slot, QObject, QThread
from PySide2.QtWidgets import QWidget, QPushButton, QApplication
from transitions.extensions import GraphMachine

logging.basicConfig(level=logging.DEBUG)


class Client(QObject):
    # Client signals
    sig_move_done = Signal()
    sig_disconnected = Signal()
    sig_connected = Signal()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    @Slot(int)
    def client_move(self, dest):
        print(f'Client moving to {dest}...')
        sleep(3)  # some blocking function
        if np.random.rand() < 0.5:
            print("Error occurred during movement...")
            self.sig_disconnected.emit()
        else:
            print("Movement done...")
            self.sig_move_done.emit()

    @Slot()
    def client_disconnect(self):
        # do something then...  on success do:
        self.sig_disconnected.emit()

    @Slot()
    def client_connect(self):
        # do something ... on success do:
        self.sig_connected.emit()


# define states, transitions and extra args for transitions state machine:
states = ['ready', 'moving', 'unknown']

transitions = [
    {'trigger': 'move', 'source': 'ready', 'dest': 'moving'},
    {'trigger': 'stopped', 'source': 'moving', 'dest': 'ready'},
    {'trigger': 'disconnect_', 'source': ['ready', 'moving'], 'dest': 'unknown'},
    {'trigger': 'error', 'source': ['ready', 'moving'], 'dest': 'unknown'},
    {'trigger': 'connect_', 'source': 'unknown', 'dest': 'ready'}

]

extra_args = dict(initial='unknown', title='Simple state machine',
                  show_conditions=True, show_state_attributes=True)


class ClientState(QObject):
    # machine signals
    sig_update_available = Signal()
    sig_move_requested = Signal(int)  # can this be avoided ? see self.on_enter_moving
    sig_connect_requested = Signal()  # can this be avoided ? 

    def __init__(self, client, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client = client
        # move client to seperate thread
        self.worker_thread = QThread()
        self.client.moveToThread(self.worker_thread)
        self.worker_thread.start()

        self.machine = GraphMachine(model=self, states=states, transitions=transitions,
                                    show_auto_transitions=False, **extra_args, after_state_change="update_available",
                                    send_event=True)

        # connecting Client signals to state machine triggers
        self.client.sig_disconnected.connect(self.disconnect_)
        self.client.sig_connected.connect(self.connect_)
        self.client.sig_move_done.connect(self.stopped)
        self.update_available = lambda *args, **kwargs: self.sig_update_available.emit()

        # can this be avoided ? see self.on_enter_moving
        self.sig_move_requested.connect(self.client.client_move)
        self.sig_connect_requested.connect(self.client.client_connect)

    def on_enter_moving(self, event):
        print(event.kwargs)
        dest = event.kwargs.get('dest', 0)
        # calling self.client_move() directly will cause self.client_move to be called from main thread...
        # calling it via a helper signal instead:
        self.sig_move_requested.emit(dest)

    def show_graph(self, **kwargs):
        stream = io.BytesIO()
        self.get_graph(**kwargs).draw(stream, prog='dot', format='svg')
        return stream.getvalue()


class GUI(QWidget):
    def __init__(self, client_state):
        super().__init__()
        self.client_state = client_state

        # setup UI
        self.setWindowTitle("State")
        self.svgWidget = QtSvg.QSvgWidget()
        self.layout = QtWidgets.QVBoxLayout()
        self.layout.addWidget(self.svgWidget)
        self.btn_move = QPushButton("move")
        self.btn_connect = QPushButton("(re-)connect")
        self.layout.addWidget(self.btn_move)
        self.layout.addWidget(self.btn_connect)

        self.setLayout(self.layout)

        # Connect Slots/Signals
        ## machine -> GUI
        self.client_state.sig_update_available.connect(self.update_gui)

        ## GUI --> machine
        self.btn_move.clicked.connect(lambda: self.client_state.move(dest=np.random.randint(1, 100)))
        self.btn_connect.clicked.connect(
            self.client_state.connect_)

        # update UI
        self.update_gui()

    def update_gui(self):
        print("Update model graph and GUI...")
        self.svgWidget.load(self.client_state.show_graph())

        if self.client_state.is_ready():
            self.btn_move.setEnabled(True)
            self.btn_connect.setDisabled(True)
        if self.client_state.is_moving():
            self.btn_move.setDisabled(True)
            self.btn_connect.setDisabled(True)
        if self.client_state.is_unknown():
            self.btn_move.setDisabled(True)
            self.btn_connect.setEnabled(True)


if __name__ == "__main__":
    import sys

    app = QApplication(sys.argv)
    client = Client()
    client_state = ClientState(client)
    gui = GUI(client_state)
    gui.show()
    sys.exit(app.exec_())

Solution

  • Is this generally speaking a valid design approach to have such a layer?

    Yes, it is valid and in complex applications the FSM is implemented as they simplify the logic.


    With regard to IMHO simplification, I prefer to verify if there are similar tools in Qt that exist in this case since they interact friendly with the elements of Qt through events or signals. In this case there are at least 2 options:

    The State Machine Framework:

    import time
    from functools import partial
    from PySide2 import QtCore, QtGui, QtWidgets
    import numpy as np
    
    
    class Client(QtCore.QObject):
        # Client signals
        sig_move_done = QtCore.Signal()
        sig_disconnected = QtCore.Signal()
        sig_connected = QtCore.Signal()
    
        @QtCore.Slot(int)
        def client_move(self, dest):
            print(f"Client moving to {dest}...")
            time.sleep(3)  # some blocking function
            if np.random.rand() < 0.5:
                print("Error occurred during movement...")
                self.sig_disconnected.emit()
            else:
                print("Movement done...")
                self.sig_move_done.emit()
    
        @QtCore.Slot()
        def client_disconnect(self):
            # do something then...  on success do:
            self.sig_disconnected.emit()
    
        @QtCore.Slot()
        def client_connect(self):
            # do something ... on success do:
            self.sig_connected.emit()
    
    
    class GUI(QtWidgets.QWidget):
        def __init__(self, parent=None):
            super().__init__(parent)
            self.setWindowTitle("State")
    
            self.btn_move = QtWidgets.QPushButton("move")
            self.btn_connect = QtWidgets.QPushButton("(re-)connect")
    
            self.client = Client()
            self._thread = QtCore.QThread(self)
            self._thread.start()
            self.client.moveToThread(self._thread)
    
            lay = QtWidgets.QVBoxLayout(self)
            lay.addWidget(self.btn_move)
            lay.addWidget(self.btn_connect)
            self.resize(320, 120)
    
            # states
            self.unknown_state = QtCore.QState()
            self.ready_state = QtCore.QState()
            self.moving_state = QtCore.QState()
    
            # transitions
            self.ready_state.addTransition(self.btn_move.clicked, self.moving_state)
            self.moving_state.addTransition(self.client.sig_move_done, self.ready_state)
            self.ready_state.addTransition(self.client.sig_disconnected, self.unknown_state)
            self.moving_state.addTransition(self.client.sig_disconnected, self.unknown_state)
            self.unknown_state.addTransition(self.btn_connect.clicked, self.ready_state)
            self.unknown_state.addTransition(self.client.sig_connected, self.ready_state)
    
            self.unknown_state.entered.connect(self.on_unknown_state_enter)
            self.ready_state.entered.connect(self.on_ready_state_enter)
            self.moving_state.entered.connect(self.on_moving_state_enter)
    
            state_machine = QtCore.QStateMachine(self)
            state_machine.addState(self.ready_state)
            state_machine.addState(self.moving_state)
            state_machine.addState(self.unknown_state)
    
            state_machine.setInitialState(self.unknown_state)
            state_machine.start()
    
        def on_unknown_state_enter(self):
            print("unknown_state")
            self.btn_move.setDisabled(True)
            self.btn_connect.setEnabled(True)
    
        def on_ready_state_enter(self):
            print("ready_state")
            self.btn_move.setEnabled(True)
            self.btn_connect.setDisabled(True)
    
        def on_moving_state_enter(self):
            print("moving_state")
            self.btn_move.setDisabled(True)
            self.btn_connect.setDisabled(True)
            dest = np.random.randint(1, 100)
            wrapper = partial(self.client.client_move, dest)
            QtCore.QTimer.singleShot(0, wrapper)
    
        def closeEvent(self, event):
            self._thread.quit()
            self._thread.wait()
            super().closeEvent(event)
    
    
    if __name__ == "__main__":
        import sys
    
        app = QtWidgets.QApplication(sys.argv)
    
        w = GUI()
        w.show()
    
        sys.exit(app.exec_())
    

    Qt SCXML:

    Simple_State_Machine.scxml

    <?xml version="1.0" encoding="UTF-8"?>
    <scxml xmlns="http://www.w3.org/2005/07/scxml" version="1.0" binding="early" xmlns:qt="http://www.qt.io/2015/02/scxml-ext" name="Simple_State_Machine" qt:editorversion="4.10.0" initial="unknown">
        <qt:editorinfo initialGeometry="150.82;359.88;-20;-20;40;40"/>
        <state id="ready">
            <qt:editorinfo stateColor="#ff974f" geometry="425.83;190.46;-60;-50;120;100" scenegeometry="425.83;190.46;365.83;140.46;120;100"/>
            <transition type="internal" event="move" target="moving">
                <qt:editorinfo endTargetFactors="35.02;9.52" movePoint="-34.84;14.59" startTargetFactors="32.33;90.16"/>
            </transition>
            <transition type="internal" event="disconnect" target="unknown">
                <qt:editorinfo endTargetFactors="91.87;60.92" movePoint="9.38;9.36" startTargetFactors="6.25;63.37"/>
            </transition>
        </state>
        <state id="unknown">
            <qt:editorinfo stateColor="#89725b" geometry="150.82;190.46;-60;-50;120;100" scenegeometry="150.82;190.46;90.82;140.46;120;100"/>
            <transition type="internal" target="ready" event="connect">
                <qt:editorinfo endTargetFactors="6.34;41.14" movePoint="0;7.30" startTargetFactors="91.13;39.41"/>
            </transition>
        </state>
        <state id="moving">
            <qt:editorinfo stateColor="#a508d0" geometry="425.83;344.53;-60;-50;120;100" scenegeometry="425.83;344.53;365.83;294.53;120;100"/>
            <transition type="internal" event="disconnect" target="unknown">
                <qt:editorinfo movePoint="2.08;17.72"/>
            </transition>
            <transition type="internal" event="stopped" target="ready">
                <qt:editorinfo endTargetFactors="68.30;90.08" movePoint="62.50;10.32" startTargetFactors="68.69;5.74"/>
            </transition>
        </state>
    </scxml>
    

    enter image description here

    import os
    import time
    from functools import partial
    from PySide2 import QtCore, QtGui, QtWidgets, QtScxml
    import numpy as np
    
    
    class Client(QtCore.QObject):
        # Client signals
        sig_move_done = QtCore.Signal()
        sig_disconnected = QtCore.Signal()
        sig_connected = QtCore.Signal()
    
        @QtCore.Slot(int)
        def client_move(self, dest):
            print(f"Client moving to {dest}...")
            time.sleep(3)  # some blocking function
            if np.random.rand() < 0.5:
                print("Error occurred during movement...")
                self.sig_disconnected.emit()
            else:
                print("Movement done...")
                self.sig_move_done.emit()
    
        @QtCore.Slot()
        def client_disconnect(self):
            # do something then...  on success do:
            self.sig_disconnected.emit()
    
        @QtCore.Slot()
        def client_connect(self):
            # do something ... on success do:
            self.sig_connected.emit()
    
    
    class GUI(QtWidgets.QWidget):
        def __init__(self, parent=None):
            super().__init__(parent)
            self.setWindowTitle("State")
    
            self.btn_move = QtWidgets.QPushButton("move")
            self.btn_connect = QtWidgets.QPushButton("(re-)connect")
    
            self.client = Client()
            self._thread = QtCore.QThread(self)
            self._thread.start()
            self.client.moveToThread(self._thread)
    
            lay = QtWidgets.QVBoxLayout(self)
            lay.addWidget(self.btn_move)
            lay.addWidget(self.btn_connect)
            self.resize(320, 120)
    
            current_dir = os.path.dirname(os.path.realpath(__file__))
            filename = os.path.join(current_dir, "Simple_State_Machine.scxml")
    
            machine = QtScxml.QScxmlStateMachine.fromFile(filename)
            machine.setParent(self)
    
            for error in machine.parseErrors():
                print(error.toString())
    
            machine.connectToState("unknown", self, QtCore.SLOT("on_unknown_state_enter(bool)"))
            machine.connectToState("ready", self, QtCore.SLOT("on_ready_state_enter(bool)"))
            machine.connectToState("moving", self, QtCore.SLOT("on_moving_state_enter(bool)"))
    
    
            self.btn_connect.clicked.connect(partial(machine.submitEvent, "connect"))
            self.btn_move.clicked.connect(partial(machine.submitEvent, "move"))
    
            self.client.sig_disconnected.connect(partial(machine.submitEvent, "disconnect"))
            self.client.sig_connected.connect(partial(machine.submitEvent, "connect"))
            self.client.sig_move_done.connect(partial(machine.submitEvent, "stopped"))
    
            machine.start()
    
        @QtCore.Slot(bool)
        def on_unknown_state_enter(self, active):
            if active:
                print("unknown_state")
                self.btn_move.setDisabled(True)
                self.btn_connect.setEnabled(True)
    
        @QtCore.Slot(bool)
        def on_ready_state_enter(self, active):
            if active:
                print("ready_state")
                self.btn_move.setEnabled(True)
                self.btn_connect.setDisabled(True)
    
        @QtCore.Slot(bool)
        def on_moving_state_enter(self, active):
            if active:
                print("moving_state")
                self.btn_move.setDisabled(True)
                self.btn_connect.setDisabled(True)
                dest = np.random.randint(1, 100)
                wrapper = partial(self.client.client_move, dest)
                QtCore.QTimer.singleShot(0, wrapper)
    
        def closeEvent(self, event):
            self._thread.quit()
            self._thread.wait()
            super().closeEvent(event)
    
    
    if __name__ == "__main__":
        import sys
    
        app = QtWidgets.QApplication(sys.argv)
    
        w = GUI()
        w.show()
    
        sys.exit(app.exec_())