I have a class that does data acquisition, i.e. PySession.scope
in a separate thread. I want to plot each scope in a ScopeGUI
, which is based on PyQt5
. Obviously, the GUI's need to be running in the main thread, however I want to be able to invoke another PySession.scope
at any time, that is, the console has to be free to submit new commands.
To conclude: Let's say I start two scopes in parallel, while the data is being collected it should be visualized in two separate instances of the ScopeGUI
. Simultaneously, the console should be free to start an additional scope. What's the best practice so achieve this behavior in Python?
Here's a reproducible example of what I have so far:
.
├── main.py
├── scope.py
├── alosaclient.py
main.py
import os
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from alosaclient import PySession
import matplotlib.pyplot as plt
if __name__ == '__main__':
session = PySession(
"C:/ProjTMS/alosa1-client/test/config/linkcommand.toml")
variables = "speedcpu U_Z"
future1 = session.scope(
varnames=variables, nsamples=20, ndiv=1, realtime=True)
future2 = session.scope("m_1 m_u", nsamples=20, ndiv=1, realtime=True)
print("----------------------------------------------MAIN THREAD----------------------------------------------")
session.work()
result1 = future1.result()
print(result1.data[1].time)
print(result1.data[1].values)
result2 = future2.result()
print(result2.data[1].time)
print(result2.data[1].values)
scope.py
import numpy as np
import sys
from PyQt5 import QtWidgets
from matplotlib.backends.backend_qt5agg import (FigureCanvas,
NavigationToolbar2QT as
NavigationToolbar)
from matplotlib.figure import Figure
class ScopeGUI(QtWidgets.QMainWindow):
def __init__(self, varnames, nsamples):
super().__init__()
self.varnames = varnames
self.nsamples = nsamples
self.setEnabled(True)
self.setGeometry(0, 0, 800, 600)
self.setMinimumSize(800, 600)
self.main_widget = QtWidgets.QWidget()
self.setCentralWidget(self.main_widget)
self.main_layout = QtWidgets.QVBoxLayout()
self.main_widget.setLayout(self.main_layout)
self._fig = Figure(figsize=(8, 6))
self._canvas = FigureCanvas(self._fig)
self.main_layout.addWidget(self._canvas)
self._axes = self._fig.subplots()
self._axes.grid(True, which="both")
self._axes.set_xlabel('Time (s)')
self._axes.set_ylabel('Data (DSPu)')
self._axes.set_xlim(left=0, right=6.3)
self._axes.set_ylim(bottom=-1.5, top=1.5)
self.lines = []
self.initialize_lines()
self.addToolBar(NavigationToolbar(self._canvas, self))
self.show()
def initialize_lines(self):
variables = self.varnames.split()
for var in variables:
line, = self._axes.plot([], [])
line.set_marker('.')
self.lines.append(line)
def plot(self, scope_data):
print("plotting")
for signal, line in zip(scope_data, self.lines):
x = signal.time
y = signal.values
common_length = min(len(x), len(y))
line.set_xdata(x[:common_length])
line.set_ydata(y[:common_length])
self._canvas.draw()
self._canvas.flush_events()
alosaclient.py
import sys
import time
from concurrent.futures import ThreadPoolExecutor
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import random
from PyQt5 import QtWidgets
from scope import ScopeGUI
import asyncio
app = QtWidgets.QApplication(sys.argv)
class ScopeDataElement:
def __init__(self, variable_index, time, values):
self.variable_index = variable_index
self.time = time
self.values = values
class Scope:
def __init__(self, data):
self.data = data
class PySession:
def __init__(self, lcmdfile):
self.pool = ThreadPoolExecutor(8)
self.loop = asyncio.get_event_loop()
self.tasks = list()
self.scope_buffer = list()
def work(self):
self.loop.run_until_complete(asyncio.wait(self.tasks))
def scope(self, varnames, nsamples=1, ndiv=1, realtime=False):
future = self.pool.submit(
self.dummy_scope, varnames, nsamples, ndiv)
if realtime:
scope_gui = ScopeGUI(varnames, nsamples)
task = self.loop.create_task(self.update(scope_gui, varnames))
self.tasks.append(task)
return future
async def update(self, scope_gui, varnames):
variable_indices = [self.name_to_index(
var) for var in varnames.split()]
# find corresponding scope_buffer is it may potentially grow dynamically
scope_index = self.find_scope_index(variable_indices)
# as long as empty, wait
while not self.scope_buffer:
await asyncio.sleep(25e-3)
# while the data is not complete, update to GUI
while not all([len(signal.time) == scope_gui.nsamples for signal in self.scope_buffer[scope_index].data]):
scope_gui.plot(self.scope_buffer[scope_index].data)
await asyncio.sleep(25e-3)
@staticmethod
def name_to_index(varname):
# dummy cross reference table: get index from variable name
varnames = ["speedcpu", "U_Z", "m_1", "m_u"]
return varnames.index(varname)
def find_scope_index(self, variable_indices):
# get scope index from variable_indices, may change if scopes run parallel
result = filter(lambda pair: all([signal.variable_index == varindex for varindex, signal in zip(
variable_indices, pair[1].data)]), enumerate(self.scope_buffer))
index = list(result)[0][0]
return index
def find_data_index(self, scope, varname):
result = filter(lambda pair: self.name_to_index(varname) ==
pair[1].variable_index, enumerate(scope.data))
index = list(result)[0][0]
return index
def dummy_scope(self, varnames, nsamples, ndiv):
variables = varnames.split()
variable_indices = [self.name_to_index(
var) for var in variables]
content = [ScopeDataElement(self.name_to_index(
var), list(), list()) for var in variables]
scope = Scope(content)
self.scope_buffer.append(scope)
for var in variables:
scope_index = self.find_scope_index(variable_indices)
data_index = self.find_data_index(
self.scope_buffer[scope_index], var)
linspace = np.linspace(0, 2*np.pi, nsamples)
for arrayidx, point in enumerate(linspace):
print(f"scope index is {scope_index}")
print(f"data index is {data_index}")
self.scope_buffer[scope_index].data[data_index].time.append(
point)
self.scope_buffer[scope_index].data[data_index].values.append(
np.sin(point) + random.uniform(0, 0.2))
time.sleep(10e-3)
return self.scope_buffer[scope_index]
Essentially, this code does the following:
Start scope (PySession.scope
)
Process event loop (PySession.work
)
Issues:
IMPORTANT
I'm working with the Python interactive console, that's why there is no app.exec_()
command. The reproducible example has the be started with python3 -i main.py to start on the same page as I.
I am fully aware that the way I'm trying to do this is probably dead wrong, that's why I'm asking you guys for help.
Thanks in advance!
Here's how it's done, no more asyncio
needed:
The update function needs to be adapted as follows. Move the canvas.draw()
from the GUI to the update
routine to avoid flickering of the figure.
def update(self, scope_gui, varnames):
try:
variable_indices = [self.name_to_index(
var) for var in varnames.split()]
# find corresponding scope_buffer is it may potentially grow dynamically
scope_index = self.find_scope_index(variable_indices)
done = False
while not self.scope_buffer:
time.sleep(10e-3)
# while the data is not complete, update to GUI
while not all([len(signal.time) == scope_gui.nsamples for signal in self.scope_buffer[scope_index].data]):
scope_gui.plot(self.scope_buffer[scope_index].data)
scope_gui._canvas.draw()
time.sleep(25e-3)
except Exception as e:
print(f"EXCEPTION OCURRED: {e}")
raise(e)
Accordingly, the scope gets adapted as well.
def scope(self, varnames, nsamples=1, ndiv=1, realtime=False):
future = self.pool.submit(
self.dummy_scope, varnames, nsamples, ndiv)
if realtime:
scope_gui = ScopeGUI(varnames, nsamples)
self.pool.submit(self.update, scope_gui, varnames)
return future
This ensures that the GUI runs in the main thread, whereas the updating task is running in a separate thread and therefore does not block the console.