Search code examples
pythonpython-3.xmultiprocessingpython-multiprocessing

How to separately start and stop multiprocessing processes in Python?


I use a dedicated Python (3.8) library to control a motor drive via a USB port.

The Python library provided by the motor control drive manufacturers (ODrive) allows a single Python process to control one or more drives.

However, I would like to run 3 processes, each controlling 1 drive.

After researching options (I first considered virtual machines, Docker containers, and multi-threading) I began believing that the easiest way to do so would be to use multiprocessing.

My problem is that I would then need a way to manage (i.e., start, monitor, and stop independently) multiple processes. The practical reason behind it is that motors are connected to different setups. Each setup must be able to be stopped and restarted separate if malfunctioning, for instance, but other running setups should not be affected by this action.

After reading around the internet and Stack Overflow, I now understand how to create a Pool of processing, how to associate processes with processor cores, how to start a pool of processes, and queuing/joining them (the latter not being needed for me).

What I don't know is how to manage them independently. How can I separately start/stop different processes without affecting the execution of the others? Are there libraries to manage them (perhaps even with a GUI)?


Solution

  • I'd probably do something like this:

    import random
    import time
    from multiprocessing import Process, Queue
    
    
    class MotorProcess:
        def __init__(self, name, com_related_params):
            self.name = name
            # Made up some parameters relating to communication
            self._params = com_related_params
            self._command_queue = Queue()
            self._status_queue = Queue()
            self._process = None
    
        def start(self):
            if self._process and self._process.is_alive():
                return
            self._process = Process(target=self.run_processing,
                                    args=(self._command_queue, self._status_queue,
                                          self._params))
            self._process.start()
    
        @staticmethod
        def run_processing(command_queue, status_queue, params):
            while True:
                # Check for commands
                if not command_queue.empty():
                    msg = command_queue.get(block=True, timeout=0.05)
                    if msg == "stop motor":
                        status_queue.put("Stopping motor")
                    elif msg == "exit":
                        return
                    elif msg.startswith("move"):
                        status_queue.put("moving motor to blah")
                        # TODO: msg parsing and move motor
                    else:
                        status_queue.put("unknown command")
    
                # Update status
                # TODO: query motor status
                status_queue.put(f"Motor is {random.randint(0, 100)}")
                time.sleep(0.5)
    
        def is_alive(self):
            if self._process and self._process.is_alive():
                return True
            return False
    
        def get_status(self):
            if not self.is_alive():
                return ["not running"]
            # Empty the queue
            recent = []
            while not self._status_queue.empty():
                recent.append(self._status_queue.get(False))
            return recent
    
        def stop_process(self):
            if not self.is_alive():
                return
            self._command_queue.put("exit")
            # Empty the stats queue otherwise it could potentially stop
            # the process from closing.
            while not self._status_queue.empty():
                self._status_queue.get()
    
            self._process.join()
    
        def send_command(self, command):
            self._command_queue.put(command)
    
    
    if __name__ == "__main__":
        processes = [MotorProcess("1", None), MotorProcess("2", None)]
    
        while True:
            cmd = input()
            if cmd == "start 1":
                processes[0].start()
            elif cmd == "move 1 to 100":
                processes[0].send_command("move to 100")
            elif cmd == "exit 1":
                processes[0].stop_process()
            else:
                for n, p in enumerate(processes):
                    print(f"motor {n + 1}", end="\n\t")
                    print("\n\t".join(p.get_status()))
    

    Not production ready (e.g. no exception handling, no proper command parsing, etc.) but shows the idea. Shout if there are any problems :D