I'm trying to access to a variable in a multiprocessing worker within a QThread.
I made a minimal example to highlight my point:
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import sys
import numpy as np
import multiprocessing
class my_Thread(QThread):
finished = pyqtSignal()
def __init__(self,M=100, N=100, nbCore=2):
QThread.__init__(self, )
self.M = M
self.N = N
self.nbCore = nbCore
def run(self):
self.my_worker = mp_worker_class()
self.Mean = self.my_worker.start(self.nbCore, self.M, self.N)
self.finished.emit()
def returninfo(self):
return self.my_worker.nbiter
class mp_worker_class():
nbiter = 0
def __init__(self,):
pass
@classmethod
def start(self, nbCore=2, M=100, N=100 ):
self.nbiter = 0
X = np.random.rand(nbCore,M,N)
pipe_list = []
for i in range(nbCore):
recv_end, send_end = multiprocessing.Pipe()
p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end))
p.start()
pipe_list.append(recv_end)
for idx, recv_end in enumerate(pipe_list):
Ymean =recv_end.recv()
print(Ymean)
@classmethod
def mp_worker(self, X=None, send_end=None):
mean = 0
nb =0
for i in range(X.shape[0]):
for j in range(X.shape[1]):
# print(self.nbiter)
mean += X[i,j]
nb += 1
self.nbiter += 1
mean /= nb
send_end.send([mean])
class GUI(QMainWindow):
def __init__(self, parent=None):
super(GUI, self).__init__()
self.parent = parent
self.centralWidget = QWidget()
self.setCentralWidget(self.centralWidget)
self.VBOX = QVBoxLayout()
self.info_LE = QLineEdit()
self.start_PB = QPushButton('Start')
self.start_PB.clicked.connect(self.startThread)
self.VBOX.addWidget(self.info_LE)
self.VBOX.addWidget(self.start_PB)
self.centralWidget.setLayout(self.VBOX)
def startThread(self):
self.thread = my_Thread(M=10000, N=10000, nbCore=5)
self.thread.finished.connect(self.threadFinished)
self.timer = QTimer()
self.timer.setInterval(100)
self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
self.timer.start()
self.ElapsedTimer = QElapsedTimer()
self.ElapsedTimer.start()
self.thread.start()
def threadFinished(self):
self.timer.stop()
self.thread.exit()
print('Finished')
def updatemsgs(self, msg, Obj):
nbiter = Obj.returninfo()
print(nbiter)
msg.setText(str(nbiter))
self.parent.processEvents()
if __name__ == "__main__":
app = QApplication(sys.argv)
ex = GUI(app)
ex.show()
sys.exit(app.exec())
In this example, I created the class my_Thread
which inherits from Qthread
. In this QThread
class I call a multiprocessing worker through the class mp_worker_class
which calls 5 times the function mp_worker
in parallel. In the class mp_worker_class
I have a variable nbiter = 0
which is increased by one each time I do a loop in the function mp_worker
. I can verify nbiter
is indeed increasing because I can see its value through a print. But from the my_Thread.returninfo()
function from where I just return the nbiter
value from the mp_worker_class
class, I just get zero.
What I would like is print the value of mp_worker_class.nbiter
in the pyqt5 QlineEdit wighet (info_LE) that I can see in the GUI. I update the text every 0.1 second. For now it just prints zeros.
Child processes in Python do not share memory by default—code running in one process cannot access or change the memory used by another process. This means that each process has its own copy of each of the variables you are using, including the mp_worker_class.nbiter
variable. Hence, you don't see changes that a child process makes to its mp_worker_class.nbiter
variable from the parent process (or any of the other child processes, for that matter.)
As you have seen, we can get data from a parent process to a child process by using the args
keyword argument to the multiprocessing.Process
constructor. However, this simply copies the data from the parent to the child; we're still not sharing memory between the two processes.
import multiprocessing
def my_example(arg):
arg.append(35)
print("Child arg:",arg)
if __name__ == "__main__":
l = [1,2,3]
print("Before:",l)
p = multiprocessing.Process(target=my_example, args=(l,))
p.start()
p.join()
print("After:",l)
# Before: [1, 2, 3]
# Child arg: [1, 2, 3, 35]
# After: [1, 2, 3]
Fortunately, multiprocessing
provides a Value class which makes it easy to create a variable in shared memory. The key is to create the Value
in the parent process and then distribute the Value
to the child processes through the args
parameter to multiprocessing.Process
.
In your code, you could create your multiprocessing.Value
in the constructor for my_Thread
. For example, you could add
self.nbiter = multiprocessing.Value('i',0)
This creates an integer multiprocessing.Value
(that is what i
means) and initializes it to 0. You could then pass this Value
to the self.my_worker.start
classmethod, which could, in turn, pass the Value
to its child mp_worker
processes.
The raw value associated with a multiprocessing.Value
object can be accessed via its value
attribute. Hence, you would need to change the code in your mp_worker
classmethod to change the value
attribute of your multiprocessing.Value
object.
You would also need to account for the fact that the +=
operation will not be atomic when using multiprocessing.Value
. Hence, your code needs to acquire a lock on the Value
before incrementing. If created a new argument to mp_worker
called nbiter
, your code for incrementing nbiter
should look like this.
with nbiter.get_lock():
nbiter.value += 1
You will also need to change your my_Thread.returninfo
method to simply return self.nbiter.value
. You may also want to set self.nbiter.value = 0
at the beginning of your my_Thread.run
method if my_Thread
will be restarted for some reason.
In summary, your code could look something like this.
from PyQt5.QtGui import *
from PyQt5.QtCore import *
from PyQt5.QtWidgets import *
import sys
import numpy as np
import multiprocessing
class my_Thread(QThread):
finished = pyqtSignal()
def __init__(self,M=100, N=100, nbCore=2):
QThread.__init__(self, )
self.M = M
self.N = N
self.nbCore = nbCore
self.nbiter = multiprocessing.Value('i',0)
def run(self):
self.nbiter.value = 0
self.my_worker = mp_worker_class()
self.Mean = self.my_worker.start(self.nbCore, self.M, self.N, self.nbiter)
self.finished.emit()
def returninfo(self):
return self.nbiter.value
class mp_worker_class():
def __init__(self,):
pass
@classmethod
def start(self, nbCore=2, M=100, N=100, nbiter=None ):
X = np.random.rand(nbCore,M,N)
pipe_list = []
for i in range(nbCore):
recv_end, send_end = multiprocessing.Pipe()
p = multiprocessing.Process(target=self.mp_worker, args=(X[i,:,:] , send_end, nbiter))
p.start()
pipe_list.append(recv_end)
for idx, recv_end in enumerate(pipe_list):
Ymean =recv_end.recv()
print(Ymean)
@classmethod
def mp_worker(self, X=None, send_end=None, nbiter=None):
mean = 0
nb =0
for i in range(X.shape[0]):
for j in range(X.shape[1]):
# print(self.nbiter)
mean += X[i,j]
nb += 1
with nbiter.get_lock():
nbiter.value += 1
mean /= nb
send_end.send([mean])
class GUI(QMainWindow):
def __init__(self, parent=None):
super(GUI, self).__init__()
self.parent = parent
self.centralWidget = QWidget()
self.setCentralWidget(self.centralWidget)
self.VBOX = QVBoxLayout()
self.info_LE = QLineEdit()
self.start_PB = QPushButton('Start')
self.start_PB.clicked.connect(self.startThread)
self.VBOX.addWidget(self.info_LE)
self.VBOX.addWidget(self.start_PB)
self.centralWidget.setLayout(self.VBOX)
def startThread(self):
self.thread = my_Thread(M=10000, N=10000, nbCore=5)
self.thread.finished.connect(self.threadFinished)
self.timer = QTimer()
self.timer.setInterval(100)
self.timer.timeout.connect(lambda msg=self.info_LE, x=self.thread: self.updatemsgs(msg, x))
self.timer.start()
self.ElapsedTimer = QElapsedTimer()
self.ElapsedTimer.start()
self.thread.start()
def threadFinished(self):
self.timer.stop()
self.thread.exit()
print('Finished')
def updatemsgs(self, msg, Obj):
nbiter = Obj.returninfo()
print(nbiter)
msg.setText(str(nbiter))
self.parent.processEvents()
if __name__ == "__main__":
app = QApplication(sys.argv)
ex = GUI(app)
ex.show()
sys.exit(app.exec())