Search code examples
pythonpython-3.xpyqt5python-multithreading

Data acquisition and plotting in separate threads


I have a class that does data acquisition, i.e. PySession.scope in a separate thread. I want to plot each scope in a ScopeGUI, which is based on PyQt5. Obviously, the GUI's need to be running in the main thread, however I want to be able to invoke another PySession.scope at any time, that is, the console has to be free to submit new commands.

To conclude: Let's say I start two scopes in parallel, while the data is being collected it should be visualized in two separate instances of the ScopeGUI. Simultaneously, the console should be free to start an additional scope. What's the best practice so achieve this behavior in Python?

Here's a reproducible example of what I have so far:

.
├── main.py
├── scope.py
├── alosaclient.py

main.py

import os
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from alosaclient import PySession
import matplotlib.pyplot as plt

if __name__ == '__main__':
    session = PySession(
        "C:/ProjTMS/alosa1-client/test/config/linkcommand.toml")
    variables = "speedcpu U_Z"
    future1 = session.scope(
        varnames=variables, nsamples=20, ndiv=1, realtime=True)
    future2 = session.scope("m_1 m_u", nsamples=20, ndiv=1, realtime=True)
    print("----------------------------------------------MAIN THREAD----------------------------------------------")
    session.work()
    result1 = future1.result()
    print(result1.data[1].time)
    print(result1.data[1].values)
    result2 = future2.result()
    print(result2.data[1].time)
    print(result2.data[1].values)

scope.py

import numpy as np
import sys
from PyQt5 import QtWidgets
from matplotlib.backends.backend_qt5agg import (FigureCanvas,
                                                NavigationToolbar2QT as
                                                NavigationToolbar)
from matplotlib.figure import Figure


class ScopeGUI(QtWidgets.QMainWindow):
    def __init__(self, varnames, nsamples):
        super().__init__()
        self.varnames = varnames
        self.nsamples = nsamples
        self.setEnabled(True)
        self.setGeometry(0, 0, 800, 600)
        self.setMinimumSize(800, 600)

        self.main_widget = QtWidgets.QWidget()
        self.setCentralWidget(self.main_widget)

        self.main_layout = QtWidgets.QVBoxLayout()
        self.main_widget.setLayout(self.main_layout)

        self._fig = Figure(figsize=(8, 6))
        self._canvas = FigureCanvas(self._fig)
        self.main_layout.addWidget(self._canvas)

        self._axes = self._fig.subplots()
        self._axes.grid(True, which="both")
        self._axes.set_xlabel('Time (s)')
        self._axes.set_ylabel('Data (DSPu)')
        self._axes.set_xlim(left=0, right=6.3)
        self._axes.set_ylim(bottom=-1.5, top=1.5)
        self.lines = []
        self.initialize_lines()

        self.addToolBar(NavigationToolbar(self._canvas, self))
        self.show()

    def initialize_lines(self):
        variables = self.varnames.split()
        for var in variables:
            line, = self._axes.plot([], [])
            line.set_marker('.')
            self.lines.append(line)

    def plot(self, scope_data):
        print("plotting")
        for signal, line in zip(scope_data, self.lines):
            x = signal.time
            y = signal.values
            common_length = min(len(x), len(y))
            line.set_xdata(x[:common_length])
            line.set_ydata(y[:common_length])
            self._canvas.draw()
            self._canvas.flush_events()

alosaclient.py

import sys
import time
from concurrent.futures import ThreadPoolExecutor
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import random
from PyQt5 import QtWidgets
from scope import ScopeGUI
import asyncio

app = QtWidgets.QApplication(sys.argv)


class ScopeDataElement:
    def __init__(self, variable_index, time, values):
        self.variable_index = variable_index
        self.time = time
        self.values = values


class Scope:
    def __init__(self, data):
        self.data = data


class PySession:
    def __init__(self, lcmdfile):
        self.pool = ThreadPoolExecutor(8)
        self.loop = asyncio.get_event_loop()
        self.tasks = list()
        self.scope_buffer = list()

    def work(self):
        self.loop.run_until_complete(asyncio.wait(self.tasks))

    def scope(self, varnames, nsamples=1, ndiv=1, realtime=False):
        future = self.pool.submit(
            self.dummy_scope, varnames, nsamples, ndiv)
        if realtime:
            scope_gui = ScopeGUI(varnames, nsamples)
            task = self.loop.create_task(self.update(scope_gui, varnames))
            self.tasks.append(task)
        return future

    async def update(self, scope_gui, varnames):
        variable_indices = [self.name_to_index(
            var) for var in varnames.split()]
        # find corresponding scope_buffer is it may potentially grow dynamically
        scope_index = self.find_scope_index(variable_indices)
        # as long as empty, wait
        while not self.scope_buffer:
            await asyncio.sleep(25e-3)
        # while the data is not complete, update to GUI
        while not all([len(signal.time) == scope_gui.nsamples for signal in self.scope_buffer[scope_index].data]):
            scope_gui.plot(self.scope_buffer[scope_index].data)
            await asyncio.sleep(25e-3)

    @staticmethod
    def name_to_index(varname):
        # dummy cross reference table: get index from variable name
        varnames = ["speedcpu", "U_Z", "m_1", "m_u"]
        return varnames.index(varname)

    def find_scope_index(self, variable_indices):
        # get scope index from variable_indices, may change if scopes run parallel
        result = filter(lambda pair: all([signal.variable_index == varindex for varindex, signal in zip(
            variable_indices, pair[1].data)]), enumerate(self.scope_buffer))
        index = list(result)[0][0]
        return index

    def find_data_index(self, scope, varname):
        result = filter(lambda pair: self.name_to_index(varname) ==
                        pair[1].variable_index, enumerate(scope.data))
        index = list(result)[0][0]
        return index

    def dummy_scope(self, varnames, nsamples, ndiv):
        variables = varnames.split()
        variable_indices = [self.name_to_index(
            var) for var in variables]
        content = [ScopeDataElement(self.name_to_index(
            var), list(), list()) for var in variables]
        scope = Scope(content)
        self.scope_buffer.append(scope)
        for var in variables:
            scope_index = self.find_scope_index(variable_indices)
            data_index = self.find_data_index(
                self.scope_buffer[scope_index], var)
            linspace = np.linspace(0, 2*np.pi, nsamples)
            for arrayidx, point in enumerate(linspace):
                print(f"scope index is {scope_index}")
                print(f"data index is {data_index}")
                self.scope_buffer[scope_index].data[data_index].time.append(
                    point)
                self.scope_buffer[scope_index].data[data_index].values.append(
                    np.sin(point) + random.uniform(0, 0.2))
                time.sleep(10e-3)
        return self.scope_buffer[scope_index]


Essentially, this code does the following:

  1. Start scope (PySession.scope)

    • trigger two dummy scope functions to gather some dummy data
    • push back according GUI update task into the event loop
  2. Process event loop (PySession.work)

    • As soon as all desired scopes have been initialized, the event loop gets processes, i.e. the GUI's are updated

Issues:

  • As long as the event loop runs, the console is blocked and no more commands can be submitted.

IMPORTANT I'm working with the Python interactive console, that's why there is no app.exec_() command. The reproducible example has the be started with python3 -i main.py to start on the same page as I.

I am fully aware that the way I'm trying to do this is probably dead wrong, that's why I'm asking you guys for help.

Thanks in advance!


Solution

  • Here's how it's done, no more asyncio needed:

    The update function needs to be adapted as follows. Move the canvas.draw() from the GUI to the update routine to avoid flickering of the figure.

        def update(self, scope_gui, varnames):
            try:
                variable_indices = [self.name_to_index(
                var) for var in varnames.split()]
                # find corresponding scope_buffer is it may potentially grow dynamically
                scope_index = self.find_scope_index(variable_indices)
                done = False
                while not self.scope_buffer:
                    time.sleep(10e-3)
                # while the data is not complete, update to GUI
                while not all([len(signal.time) == scope_gui.nsamples for signal in self.scope_buffer[scope_index].data]):
                    scope_gui.plot(self.scope_buffer[scope_index].data)
                    scope_gui._canvas.draw()
                    time.sleep(25e-3)
    
            except Exception as e:
                print(f"EXCEPTION OCURRED: {e}")
                raise(e)
    

    Accordingly, the scope gets adapted as well.

        def scope(self, varnames, nsamples=1, ndiv=1, realtime=False):
            future = self.pool.submit(
                self.dummy_scope, varnames, nsamples, ndiv)
            if realtime:
                scope_gui = ScopeGUI(varnames, nsamples)
                self.pool.submit(self.update, scope_gui, varnames)
            return future
    

    This ensures that the GUI runs in the main thread, whereas the updating task is running in a separate thread and therefore does not block the console.