I created a GUI with PyQt5
, and I wand to test it through pytest
.
I my GUI requires to redirect the standard output, so I use a Qthread
to create a listener.
This listener put the stdout
in a Queue
and send a signal which exploited by the GUI.
Until here, there is no problem. My issue appears when I what to quit; when I quit using python interpreter I have no problem, but when I use pytest
I get EOFError
or a message saying that I kill a running thread.
I tried to quit correctly but the problem persist, so I came for help.
Here is an example of the GUI.py
:
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import sys
from functools import partial
import multiprocessing
from PyQt5 import QtGui, QtCore, QtWidgets, QtTest
from PyQt5.QtWidgets import *
from PyQt5.QtCore import QCoreApplication, Qt, QObject, pyqtSignal, pyqtSlot, QThread
from PyQt5.QtGui import QIcon, QTextCursor
class MyReceiver(QObject):
mysignal = pyqtSignal(str)
def __init__(self,queue,*args,**kwargs):
QObject.__init__(self,*args,**kwargs)
self.queue = queue
self.runCondition=True
@pyqtSlot(str)
def run(self):
while self.runCondition:
text = self.queue.get()
self.mysignal.emit(text)
def QueueStreamSetup():
queue = multiprocessing.Queue(-1)
sys.stdout = WriteStream(queue)
#sys.stderr = WriteStream(queue)
return queue
class WriteStream(object):
def __init__(self,queue):
self.queue = queue
def write(self, text):
self.queue.put(text)
def flush(self):
self.queue.put('FLUSH ')
QtTest.QTest.qWait(2 * 1000)
pass
def threadConnect(view, queue):
qthread = QThread()
my_receiver = MyReceiver(queue)
my_receiver.mysignal.connect(view.append_text)
my_receiver.moveToThread(qthread)
#
qthread.started.connect(partial(my_receiver.run,))
qthread.start()
return(qthread, my_receiver)
class Example(QMainWindow):
def __init__(self):
super().__init__()
self.initUI(self)
def restore(self):
# Restore sys.stdout
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
@pyqtSlot(str)
def append_text(self,text):
self.textEdit.moveCursor(QTextCursor.End)
self.textEdit.insertPlainText( text )
self.textEdit.moveCursor(QTextCursor.End)
def initUI(self, MainWindow):
# centralwidget
MainWindow.resize(346, 193)
self.centralwidget = QtWidgets.QWidget(MainWindow)
# The Action to quit
self.exitAction = QAction(QIcon('exit24.png'), 'Exit', self)
self.exitAction.setShortcut('Ctrl+Q')
self.exitAction.triggered.connect(self.close)
# The bar
self.statusBar()
self.menubar = self.menuBar()
self.fileMenu = self.menubar.addMenu('&File')
self.exitMenu=self.fileMenu.addAction(self.exitAction)
# tThe Button
self.btn_quit = QtWidgets.QPushButton(self.centralwidget)
self.btn_quit.setGeometry(QtCore.QRect(120, 20, 89, 25))
self.btn_quit.clicked.connect(lambda: self.doPrint() )
# The textEdit
self.textEdit = QtWidgets.QTextEdit(self.centralwidget)
self.textEdit.setGeometry(QtCore.QRect(10, 60, 321, 81))
# Show the frame
MainWindow.setCentralWidget(self.centralwidget)
self.show()
def doPrint(self):
# Test to print something.
print('TEST doPrint')
def closeEvent(self, event):
# Ask a question before to quit.
reply = QMessageBox.question(self, 'Message',
"Are you sure to quit?", QMessageBox.Yes |
QMessageBox.No, QMessageBox.No)
# Treat the answer.
if reply == QMessageBox.Yes:
self.restore()
event.accept()
else:
event.ignore()
def main():
queue = QueueStreamSetup()
app = QApplication(sys.argv)
ex = Example()
qthread, my_receiver = threadConnect(ex, queue)
return app, ex, queue, qthread, my_receiver
def finish(queue, qthread, my_receiver):
print('Finish')
my_receiver.runCondition=False
queue.close()
queue.join_thread()
qthread.terminate()
qthread.wait()
qthread.exit()
print('Finish Done')
if __name__ == '__main__':
app, ex, queue, qthread, my_receiver =main()
rc= app.exec_()
finish(queue, qthread, my_receiver)
print('the application ends with exit code {}'.format(rc))
sys.exit(rc)
Then the pytest file named test_GUI.py :
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os, sys
import pytest
from PyQt5 import QtGui, QtCore, QtWidgets, QtTest
from PyQt5.QtWidgets import *
from PyQt5.QtCore import QCoreApplication, Qt, QObject
import GUI
@pytest.fixture(scope="module")
def Viewer(request):
print(" SETUP GUI")
yield GUI.main()
print(" TEARDOWN GUI")
class Test_GUI_CXS() :
def test_launching(self, Viewer, qtbot, mocker, caplog):
# open the window and add the qtbot.
print(" SETUP Window")
app, window, queue, qthread, my_receiver = Viewer
qtbot.addWidget(window)
qtbot.wait_for_window_shown(window)
QtTest.QTest.qWait(0.5 *1000)
# Test
qtbot.mouseClick( window.btn_quit, QtCore.Qt.LeftButton )
QtTest.QTest.qWait(0.5 *1000)
# EXIT
mocker.patch.object(QMessageBox, 'question', return_value=QMessageBox.Yes)
window.exitAction.trigger()
QtTest.QTest.qWait(1 *1000)
assert window.close()
# Finish the processes.
print( "App END")
GUI.finish(queue, qthread, my_receiver)
print( "END TEST.")
So if I run the command: pytest -v -s ./test_GUI.py
, I get the following element in the messages:
Qt exceptions in virtual methods:
________________________________________________________________________________
Traceback (most recent call last):
File "xxx/GUI.py", line 25, in run
text = self.queue.get()
File "xxx/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
I do not understand why there is this end of file error, but I assume that it is linked with the termination of the Queue
and the Qthread
.
Because until the Queue
is not empty, the my_receiver
can not stop running and so the Qthread
can not be terminate.
Unfortunately I found nothing about this kind of problem on the Internet.
Any suggestions or help on this case would be greatly appreciate.
The problem is that when you close the Queue the self.queue.get()
is still running, preventing the thread from finishing executing as it blocks while self.runCondition:
from executing. Considering the above a possible solution is to send a None and then just close the Queue:
class MyReceiver(QObject):
mysignal = pyqtSignal(str)
def __init__(self, queue, *args, **kwargs):
super(MyReceiver, self).__init__(*args, **kwargs)
self.queue = queue
self.runCondition = True
def run(self):
while self.runCondition:
text = self.queue.get()
if isinstance(text, str):
self.mysignal.emit(text)
def finish(queue, qthread, my_receiver):
print('Finish')
my_receiver.runCondition = False
queue.put(None)
qthread.quit()
qthread.wait()
qthread.exit()
queue.close()
queue.join_thread()
print('Finish Done')