Search code examples
pythonmultithreadinginputpython-multithreading

How to control a Thread via input()?


I want to run a code with process parallel to my main code but also want to access its parameters or start/stop the process via command prompt.

my machine is win7 64bit. Something in mind is:

from multiprocessing import Process

class dllapi():
    ...

def apiloop(params, args):
    apiclient = dllapi(**args)
    while True:
        apiclient.cycle()
        params = [....]

def mainloop(args):
    p = Process(target = apiloop, args=(params, args, ))
    while True:
        cmd = input()
        if cmd == 'kill':
            p.terminate()
        if cmd == 'stop':
            pass # no idea
        if cmd == 'resume':
            pass # no idea
        if cmd == 'report':
            print (params)

I wish to make it simple. I did tried to make apiloop as thread yet input() could freeze the program and stopped apiloop working until i pressed enter...

To share the parameters from apiloop process, i did try queue and pipe, but, seem to me, queue needs .join to wait until apiloop is done and pipe has buffer limit.

(actually i can make apiclient.cycle runs every 1s but i wish to keep apiclient alive)

I wish to know if it's worth to dig deeper with multiprocessing (e.g. will try manager as well...) or there are other approaches which is more suitable for my case. Thanks in advance...

* UPDATED: 201809170953*

Some progress with manager as below:

from multiprocessing import Process, Manager

class dllapi():
    ...
class webclientapi():
    ...

def apiloop(args, cmd, params):
    apiclient = dllapi(**args)
    status = True
    while True:
        # command from main
        if cmd == 'stop':
            status = False
        elif cmd == 'start':
            status = True
        cmd = None
        # stop or run
        if status == True:
            apiclient.cycle()
        # update parameters
        params['status'] = status

def uploadloop(cmds, params):
    uploadclient = webclientapi()
    status = True
    while True:
        # command from main
        if cmd == 'stop':
            status = False
        elif cmd == 'start':
            status = True
        cmd = None
        # stop or run
        if status == True:
            # upload 'status' from apiclient to somewhere
            uploadclient.cycle(params['status'])

def mainloop(args):

    manager = Manager()
    mpcmds = {}
    mpparams = {}
    mps = {}

    mpcmds   ['apiloop'] = manager.Value('u', 'start')
    mpparams ['apiloop'] = manager.dict()
    mps      ['apiloop'] = Process(target = apiloop, args=(args, mpcmds['apiloop'], mpparams['apiloop'])

    mpcmds   ['uploadloop'] = manager.Value('u', 'start')
    # mpparams ['uploadloop'] is directly from mpparams ['apiloop']
    mps      ['uploadloop'] = Process(target = uploadloop, args=(mpcmds['uploadloop'], mpparams['apiloop'])

    for key, mp in mps.items():
        mp.daemon = True
        mp.start()

    while True:
        cmd = input().split(' ')
        # kill daemon process with exit()
        if cmd[0] == 'bye':
            exit()
        # kill individual process
        if cmd[0] == 'kill':
            mps[cmd[1]].terminate()
        # stop individual process via command
        if cmd[0] == 'stop':
            mpcmds[cmd[1]] = 'stop'
        # stop individual process via command
        if cmd[0] == 'start':
            mpcmds[cmd[1]] = 'start'
        # report individual process info via command
        if cmd[0] == 'report':
            print (mpparams ['apiloop'])

Hope this'd help someone.


Solution

  • I'm showing you how to solve the general problem with threads only, because that is what you tried first and your example doesn't bring up the need for a child-process.

    In the example below your dllapi class is named Zoo and it's subclassing threading.Thread, adding some methods to allow execution control. It takes some data upon initialization and its cycle-method simply iterates repeatedly over this data and just counts how many times it has seen the specific item.

    import time
    import logging
    from queue import Queue
    from threading import Thread
    
    from itertools import count, cycle
    
    
    class Zoo(Thread):
    
        _ids = count(1)
    
        def __init__(self, cmd_queue, data, *args,
                 log_level=logging.DEBUG, **kwargs):
    
            super().__init__()
            self.name = f'{self.__class__.__name__.lower()}-{next(self._ids)}'
            self.data = data
            self.log_level = log_level
            self.args = args
            self.kwargs = kwargs
    
            self.logger = self._init_logging()
            self.cmd_queue = cmd_queue
    
            self.data_size = len(data)
            self.actual_item = None
            self.iter_cnt = 0
            self.cnt = count(1)
            self.cyc = cycle(self.data)
    
        def cycle(self):
            item = next(self.cyc)
            if next(self.cnt) % self.data_size == 0:  # new iteration round
                self.iter_cnt += 1
            self.actual_item = f'{item}_{self.iter_cnt}'
    
        def run(self):
            """
            Run is the main-function in the new thread. Here we overwrite run
            inherited from threading.Thread.
            """
            while True:
                if self.cmd_queue.empty():
                    self.cycle()
                    time.sleep(1)  # optional heartbeat
                else:
                    self._get_cmd()
                    self.cmd_queue.task_done()  # unblocks prompter
    
        def stop(self):
            self.logger.info(f'stopping with actual item: {self.actual_item}')
            # do clean up
            raise SystemExit
    
        def pause(self):
            self.logger.info(f'pausing with actual item: {self.actual_item}')
            self.cmd_queue.task_done()  # unblocks producer joining the queue
            self._get_cmd()  # just wait blockingly until next command
    
        def resume(self):
            self.logger.info(f'resuming with actual item: {self.actual_item}')
    
        def report(self):
            self.logger.info(f'reporting with actual item: {self.actual_item}')
            print(f'completed {self.iter_cnt} iterations over data')
    
        def _init_logging(self):
            fmt = '[%(asctime)s %(levelname)-8s %(threadName)s' \
              ' %(funcName)s()] --- %(message)s'
            logging.basicConfig(format=fmt, level=self.log_level)
            return logging.getLogger()
    
        def _get_cmd(self):
            cmd = self.cmd_queue.get()
            try:
                self.__class__.__dict__[cmd](self)
            except KeyError:
                print(f'Command `{cmd}` is unknown.')
    

    input is a blocking function. You need to outsource it in a separate thread so it doesn't block your main-thread. In the example below input is wrapped in Prompter, a class subclassing threading.Thread. Prompter passes inputs into a command-queue. This command-queue is read by Zoo.

    class Prompter(Thread):
        """Prompt user for command input.
        Runs in a separate thread so the main-thread does not block.
        """
        def __init__(self, cmd_queue):
            super().__init__()
            self.cmd_queue = cmd_queue
    
        def run(self):
            while True:
                cmd = input('prompt> ')
                self.cmd_queue.put(cmd)
                self.cmd_queue.join()  # blocks until consumer calls task_done()
    
    
    if __name__ == '__main__':
    
        data = ['ape', 'bear', 'cat', 'dog', 'elephant', 'frog']
    
        cmd_queue = Queue()
        prompter = Prompter(cmd_queue=cmd_queue)
        prompter.daemon = True
    
        zoo = Zoo(cmd_queue=cmd_queue, data=data)
    
        prompter.start()
        zoo.start()
    

    Example session in terminal:

    $python control_thread_over_prompt.py
    prompt> report
    [2018-09-16 17:59:16,856 INFO     zoo-1 report()] --- reporting with actual item: dog_0
    completed 0 iterations over data
    prompt> pause
    [2018-09-16 17:59:26,864 INFO     zoo-1 pause()] --- pausing with actual item: bear_2
    prompt> resume
    [2018-09-16 17:59:33,291 INFO     zoo-1 resume()] --- resuming with actual item: bear_2
    prompt> report
    [2018-09-16 17:59:38,296 INFO     zoo-1 report()] --- reporting with actual item: ape_3
    completed 3 iterations over data
    prompt> stop
    [2018-09-16 17:59:42,301 INFO     zoo-1 stop()] --- stopping with actual item: elephant_3