Search code examples
pythonmultithreadingpython-multithreadingproducer-consumer

How to make changes to or call a method in a running thread in Python?


I have a producer thread that produces data from a serial connection and puts them into multiple queues that will be used by different consumer threads. However, I'd like to be able to add in additional queues (additional consumers) from the main thread after the producer thread has already started running.

I.e. In the code below, how could I add a Queue to listOfQueues from the main thread while this thread is running? Can I add in a method such as addQueue(newQueue) to this class which appends to it listOfQueues? This doesn't seem likely as the thread will be in the run method. Can I create some sort of Event similar to the stop event?

class ProducerThread(threading.Thread):
    def __init__(self, listOfQueues):
        super(ProducerThread, self).__init__()
        self.listOfQueues = listOfQueues
        self._stop_event = threading.Event() # Flag to be set when the thread should stop

    def run(self):
        ser = serial.Serial() # Some serial connection 

        while(not self.stopped()):
            try:
                bytestring = ser.readline() # Serial connection or "producer" at some rate
                for q in self.listOfQueues:
                    q.put(bytestring)
            except serial.SerialException:
                continue

    def stop(self):
        '''
        Call this function to stop the thread. Must also use .join() in the main
        thread to fully ensure the thread has completed.
        :return: 
        '''
        self._stop_event.set()

    def stopped(self):
        '''
        Call this function to determine if the thread has stopped. 
        :return: boolean True or False
        '''
        return self._stop_event.is_set()

Solution

  • Sure, you can simply have an append function that adds to your list. E.g.

    def append(self, element):
        self.listOfQueues.append(element)
    

    That will work even after your thread's start() method has been called.

    Edit: for non thread-safe procedures you can use a lock, e.g.:

    def unsafe(self, element):
        with self.lock:
            # do stuff
    

    You would then also need to add the lock inside your run method, e.g.:

    with lock:
        for q in self.listOfQueues:
            q.put(bytestring)
    

    Any code acquiring a lock will wait for the lock to be released elsewhere.