Search code examples
pythonmultiprocessingqthread

How do I access to a variable inside a multiprocessing worker itself contained in a Qthread?


I'm trying to access to a variable in a multiprocessing worker within a QThread.

I made a minimal example to highlight my point:

from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *

import sys
import numpy as np
import multiprocessing

class my_Thread(QThread):
    finished = pyqtSignal()

    def __init__(self,M=100, N=100, nbCore=2):
        QThread.__init__(self, )
        self.M = M
        self.N = N
        self.nbCore = nbCore

    def run(self):
        self.my_worker = mp_worker_class()
        self.Mean = self.my_worker.start(self.nbCore, self.M, self.N)
        self.finished.emit()

    def returninfo(self):
        return self.my_worker.nbiter

class mp_worker_class():
    nbiter = 0
    def __init__(self,):
        pass

    @classmethod
    def start(self, nbCore=2, M=100, N=100 ):
        self.nbiter = 0
        X = np.random.rand(nbCore,M,N)
        pipe_list = []
        for i in range(nbCore):
            recv_end, send_end = multiprocessing.Pipe()
            p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end))
            p.start()
            pipe_list.append(recv_end)

        for idx, recv_end in enumerate(pipe_list):
            Ymean =recv_end.recv()
            print(Ymean)

    @classmethod
    def mp_worker(self, X=None, send_end=None):
        mean = 0
        nb =0
        for i in range(X.shape[0]):
            for j in range(X.shape[1]):
                # print(self.nbiter)
                mean += X[i,j]
                nb += 1
                self.nbiter += 1
        mean /= nb
        send_end.send([mean])


class GUI(QMainWindow):
    def __init__(self, parent=None):
        super(GUI, self).__init__()
        self.parent = parent

        self.centralWidget = QWidget()
        self.setCentralWidget(self.centralWidget)

        self.VBOX = QVBoxLayout()
        self.info_LE = QLineEdit()
        self.start_PB = QPushButton('Start')
        self.start_PB.clicked.connect(self.startThread)
        self.VBOX.addWidget(self.info_LE)
        self.VBOX.addWidget(self.start_PB)

        self.centralWidget.setLayout(self.VBOX)

    def startThread(self):
        self.thread = my_Thread(M=10000, N=10000, nbCore=5)
        self.thread.finished.connect(self.threadFinished)

        self.timer = QTimer()
        self.timer.setInterval(100)
        self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
        self.timer.start()
        self.ElapsedTimer = QElapsedTimer()
        self.ElapsedTimer.start()

        self.thread.start()

    def threadFinished(self):
        self.timer.stop()
        self.thread.exit()
        print('Finished')

    def updatemsgs(self, msg, Obj):
        nbiter = Obj.returninfo()
        print(nbiter)
        msg.setText(str(nbiter))
        self.parent.processEvents()

if __name__ == "__main__":
    app = QApplication(sys.argv)
    ex = GUI(app)
    ex.show()
    sys.exit(app.exec())

In this example, I created the class my_Thread which inherits from Qthread. In this QThread class I call a multiprocessing worker through the class mp_worker_class which calls 5 times the function mp_worker in parallel. In the class mp_worker_class I have a variable nbiter = 0 which is increased by one each time I do a loop in the function mp_worker. I can verify nbiteris indeed increasing because I can see its value through a print. But from the my_Thread.returninfo() function from where I just return the nbitervalue from the mp_worker_class class, I just get zero.

What I would like is print the value of mp_worker_class.nbiter in the pyqt5 QlineEdit wighet (info_LE) that I can see in the GUI. I update the text every 0.1 second. For now it just prints zeros.


Solution

  • Child processes in Python do not share memory by default—code running in one process cannot access or change the memory used by another process. This means that each process has its own copy of each of the variables you are using, including the mp_worker_class.nbiter variable. Hence, you don't see changes that a child process makes to its mp_worker_class.nbiter variable from the parent process (or any of the other child processes, for that matter.)

    As you have seen, we can get data from a parent process to a child process by using the args keyword argument to the multiprocessing.Process constructor. However, this simply copies the data from the parent to the child; we're still not sharing memory between the two processes.

    import multiprocessing
    
    def my_example(arg):
        arg.append(35)
        print("Child arg:",arg)
        
    if __name__ == "__main__":
        l = [1,2,3]
        print("Before:",l)
        p = multiprocessing.Process(target=my_example, args=(l,))
        p.start()
        p.join()
        print("After:",l)
        
    # Before: [1, 2, 3]
    # Child arg: [1, 2, 3, 35]
    # After: [1, 2, 3]
    

    Fortunately, multiprocessing provides a Value class which makes it easy to create a variable in shared memory. The key is to create the Value in the parent process and then distribute the Value to the child processes through the args parameter to multiprocessing.Process.


    In your code, you could create your multiprocessing.Value in the constructor for my_Thread. For example, you could add

    self.nbiter = multiprocessing.Value('i',0)
    

    This creates an integer multiprocessing.Value (that is what i means) and initializes it to 0. You could then pass this Value to the self.my_worker.start classmethod, which could, in turn, pass the Value to its child mp_worker processes.

    The raw value associated with a multiprocessing.Value object can be accessed via its value attribute. Hence, you would need to change the code in your mp_worker classmethod to change the value attribute of your multiprocessing.Value object.

    You would also need to account for the fact that the += operation will not be atomic when using multiprocessing.Value. Hence, your code needs to acquire a lock on the Value before incrementing. If created a new argument to mp_worker called nbiter, your code for incrementing nbiter should look like this.

    with nbiter.get_lock():
        nbiter.value += 1
    

    You will also need to change your my_Thread.returninfo method to simply return self.nbiter.value. You may also want to set self.nbiter.value = 0 at the beginning of your my_Thread.run method if my_Thread will be restarted for some reason.

    In summary, your code could look something like this.

    from PyQt5.QtGui import *
    from PyQt5.QtCore import *
    from PyQt5.QtWidgets import *
    
    import sys
    import numpy as np
    import multiprocessing
    
    class my_Thread(QThread):
        finished = pyqtSignal()
    
        def __init__(self,M=100, N=100, nbCore=2):
            QThread.__init__(self, )
            self.M = M
            self.N = N
            self.nbCore = nbCore
            self.nbiter = multiprocessing.Value('i',0)
    
        def run(self):
            self.nbiter.value = 0
            self.my_worker = mp_worker_class()
            self.Mean = self.my_worker.start(self.nbCore, self.M, self.N, self.nbiter)
            self.finished.emit()
    
        def returninfo(self):
            return self.nbiter.value
    
    class mp_worker_class():
        def __init__(self,):
            pass
    
        @classmethod
        def start(self, nbCore=2, M=100, N=100, nbiter=None ):
            X = np.random.rand(nbCore,M,N)
            pipe_list = []
            for i in range(nbCore):
                recv_end, send_end = multiprocessing.Pipe()
                p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end, nbiter))
                p.start()
                pipe_list.append(recv_end)
    
            for idx, recv_end in enumerate(pipe_list):
                Ymean =recv_end.recv()
                print(Ymean)
    
        @classmethod
        def mp_worker(self, X=None, send_end=None, nbiter=None):
            mean = 0
            nb =0
            for i in range(X.shape[0]):
                for j in range(X.shape[1]):
                    # print(self.nbiter)
                    mean += X[i,j]
                    nb += 1
                    with nbiter.get_lock():
                        nbiter.value += 1
            mean /= nb
            send_end.send([mean])
    
    
    class GUI(QMainWindow):
        def __init__(self, parent=None):
            super(GUI, self).__init__()
            self.parent = parent
    
            self.centralWidget = QWidget()
            self.setCentralWidget(self.centralWidget)
    
            self.VBOX = QVBoxLayout()
            self.info_LE = QLineEdit()
            self.start_PB = QPushButton('Start')
            self.start_PB.clicked.connect(self.startThread)
            self.VBOX.addWidget(self.info_LE)
            self.VBOX.addWidget(self.start_PB)
    
            self.centralWidget.setLayout(self.VBOX)
    
        def startThread(self):
            self.thread = my_Thread(M=10000, N=10000, nbCore=5)
            self.thread.finished.connect(self.threadFinished)
    
            self.timer = QTimer()
            self.timer.setInterval(100)
            self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
            self.timer.start()
            self.ElapsedTimer = QElapsedTimer()
            self.ElapsedTimer.start()
    
            self.thread.start()
    
        def threadFinished(self):
            self.timer.stop()
            self.thread.exit()
            print('Finished')
    
        def updatemsgs(self, msg, Obj):
            nbiter = Obj.returninfo()
            print(nbiter)
            msg.setText(str(nbiter))
            self.parent.processEvents()
    
    if __name__ == "__main__":
        app = QApplication(sys.argv)
        ex = GUI(app)
        ex.show()
        sys.exit(app.exec())