I have seen other questions on the topic on this forum, but none have helped me understand how to deal with this. Most of them also seem to me to be about quite intricated and long code. I believe I am doing something rather simple / would like to do something rather simple. I hope someone can help! Here below extensive explanations and then my current code.
NOTE: please do not delete this question. I have given the following a lot of thought and I have carefully gone through related threads, but to no avail. I also believe it makes sense to post this because it is partly related to a more generic question: that of how to plot in real-time while having callbacks running in the background (see summary at the end), which could be summarized to be my general goal.
Setup and goal: National Instruments acquisition module (this matters litlle) NI cDAQ9178, interfaced via nidaqmx-python
, a package maintained by NI with documentation here. Some analog signal is inputed in there, and the goal is to acquire it continuously (until I decide to stop the acquisition) at a certain sampling rate (approximately 1000 Hz) while plotting the signal in real time. The plotting needs not be refreshed nearly so often (10Hz refresh rate would even be fine). I'm using Windows 10 with Python 3.7 in a conda virtual environment, and editing is done in PyCharm. Ideally things should work both in PyCharm and any terminal.
Situation: nidaqmx-python
provides high-level functions that allow one to register callbacks (which one defines as one wishes), which are called everytime a certain number of samples (in my case 100, but that's not strict) fills the PC buffer. The idea is that the callback, defined below, reads the buffer at that point, and does something (in my case some low-pass filtering, which I have taken out for conciseness, some storing into a global variable data
, and maybe plotting - see below).
Problem: I have been fooling around with having whatever plots the data in real time be included in the callback, but with matplotlib that is a nightmare because the callback uses threads other than the main one, and matplotlib does not like to be called from anywhere outside the main thread. I've googled the heck out of other libraries optimized for real-time plotting (and, I was thinking, hopefully thread safe) but it's not so easy: I cannot get vispy to work and I cannot get pyqtgraph to even install, just to give you some examples. Then I saw several posts on the internet of people actually managing pretty decent real-time animations with matplotlib, despite it having been developped with publication in mind and not these applications; so I thought let's give it a go.
My take: Since I could not have matplotlib do the work from inside the callback, I did the following (which is the code you see below): after the callback and after the task is started with task.start()
(that's specific to nidaqmx-python
), I just create a while
loop which plots the global variable buffer
. I thought it was a nice trick: see, buffer
is updated (call it that) by the callback every 0.1 seconds or so (does not matter) and, on the side, the while
loop is plotting the buffer
variable over and over, erasing everytime before plotting, effectively yielding a real-time like plot.
NOTE: I am perfectly aware the plotting part is not nearly as good as it could be made (I probably should use the ax API of matplotlib and the subplots
, not to mention animation), but I do not care for the moment. I'll deal with that later and refine it to make it more efficient.
What I want: this actually does what I want ... except, in order to stop it, I introduced the try:
and except:
statements around the while
loop, as you see in the code below. Naturally, pressing CTRL+C
does break the loop ... but it then also breaks the whole running script and leaves me with the following error: forrtl: error (200): program aborting due to control-C event
, in PyCharm, and the following precision when run from a terminal:
Image PC Routine Line Source
libifcoremd.dll 00007FFECF413B58 Unknown Unknown Unknown
KERNELBASE.dll 00007FFF219F60A3 Unknown Unknown Unknown
KERNEL32.DLL 00007FFF23847BD4 Unknown Unknown Unknown
ntdll.dll 00007FFF240CCED1 Unknown Unknown Unknown
QObject::~QObject: Timers cannot be stopped from another thread
The inconvenience is that I then have no choice but to close the python shell (thinking PyCharm again), and I do not have access to my precious variable data
, containing ... well, my data.
Guess: obviously, the callback does not like to be stopped in this fahsion. The nidaqmx_python
task should be stopped with task.stop()
. I try putting task.stop()
right after the KeyboardInterrupt except:
, but it does not help, since CTRL+C
stops the script on top / instead of breaking the while loop. I believe some more sofisticated method of stopping my task is required. I have been thinking about this for days but can't figure out a way of having both things: a task I can stop, and at the same time real-time plotting. Note that, without the plotting, it is easy to stop the task upon ENTER
keypress: one simply writes at the end
input('Press ENTER to stop task')
task.stop()
But of course simply doing the above does not allow me to include the real-time plotting part.
Summary: I could not call matplotlib from the callback which reads the data continuously, so I wrote a while
loop for real-time plotting in a separate block, but then I see no way of stopping that while
loop without getting the above error (which complains that the callback was stop from a different thread, I think).
I hope I am being clear and if not, please do ask!
Code: I've cleaned it to get it down to as close as can be to an MWE that shows the problem, although of course I realize most of you don't have an NI daq to play around and connect so as to be able to run this. Anyway ... here it is:
import matplotlib.pyplot as plt
import numpy as np
import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants
sfreq = 1000
bufsize = 100
with nidaqmx.Task() as task:
# Here we set up the task ... nevermind
task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
task.timing.cfg_samp_clk_timing(rate=sfreq, sample_mode=constants.AcquisitionType.CONTINUOUS,
samps_per_chan=bufsize)
# Here we define a stream to be read continuously
stream = stream_readers.AnalogMultiChannelReader(task.in_stream)
data = np.zeros((1, 0)) # initializing an empty numpy array for my total data
buffer = np.zeros((1, bufsize)) # defined so that global buffer can be written to by the callback
# This is my callback to read data continuously
def reading_task_callback(task_idx, event_type, num_samples, callback_data): # bufsize is passed to num_samples when this is called
global data
global buffer
buffer = np.zeros((1, num_samples))
# This is the reading part
stream.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)
data = np.append(data, buffer, axis=1) # appends buffered data to variable data
return 0 # Absolutely needed for this callback to be well defined (see nidaqmx doc).
# Here is the heavy lifting I believe: the above callback is registered
task.register_every_n_samples_acquired_into_buffer_event(bufsize, reading_task_callback)
task.start() # The task is started (callback called periodically)
print('Acquiring sensor data. Press CTRL+C to stop the run.\n') # This should work ...
fig = plt.figure()
try:
while True:
# Poor's man plot updating
plt.clf()
plt.plot(buffer.T)
plt.show()
plt.pause(0.01) # 100 Hz refresh rate
except KeyboardInterrupt: # stop loop with CTRL+C ... or so I thought :-(
plt.close(fig)
pass
task.stop() # I believe I never get to this part after pressing CTRL+C ...
# Some prints at the end ... nevermind
print('Total number of acquired samples: ', len(data.T),'\n')
print('Sampling frequency: ', sfreq, 'Hz\n')
print('Buffer size: ', bufsize, '\n')
print('Acquisition duration: ', len(data.T)/sfreq, 's\n')
Any input would be appreciated. Thank you in advance folks!
EDIT: after the accepted answer here below, I rewrote the code above and came up with the following, which works as intended now (sorry, this time I have not cleaned it up, and some lines are irrelevant for the present question):
# Stream read from a task that is set up to read continuously
import matplotlib.pyplot as plt
import numpy as np
import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants
from scipy import signal
import threading
running = True
sfreq = 1000
bufsize = 100
bufsizeb = 100
global task
def askUser(): # it might be better to put this outside of task
global running
input("Press return to stop.")
running = False
def main():
global running
global data
global buffer
global data_filt
global buffer_filt
global b
global z
print('Acquiring sensor data...')
with nidaqmx.Task() as task: # maybe we can use target as above
thread = threading.Thread(target=askUser)
thread.start()
task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
task.timing.cfg_samp_clk_timing(rate=sfreq, sample_mode=constants.AcquisitionType.CONTINUOUS,
samps_per_chan=bufsize)
# unclear samps_per_chan is needed here above or why it would be different than bufsize
stream = stream_readers.AnalogMultiChannelReader(task.in_stream)
data = np.zeros((1, 0)) # probably not the most elegant way of initializing an empty numpy array
buffer = np.zeros((1, bufsizeb)) # defined so that global buffer can be written in the callback
data_filt = np.zeros((1, 0)) # probably not the most elegant way of initializing an empty numpy array
buffer_filt = np.zeros((1, bufsizeb)) # defined so that global buffer can be written in the callback
b = signal.firwin(150, 0.004)
z = signal.lfilter_zi(b, 1)
def reading_task_callback(task_idx, event_type, num_samples, callback_data): # bufsizeb is passed to num_samples
global data
global buffer
global data_filt
global buffer_filt
global z
global b
if running:
# It may be wiser to read slightly more than num_samples here, to make sure one does not miss any sample,
# see: https://documentation.help/NI-DAQmx-Key-Concepts/contCAcqGen.html
buffer = np.zeros((1, num_samples))
stream.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)
data = np.append(data, buffer, axis=1) # appends buffered data to variable data
# IIR Filtering, low-pass
buffer_filt = np.zeros((1, num_samples))
for i, x in enumerate(np.squeeze(buffer)): # squeeze required for x to be just a scalar (which lfilter likes)
buffer_filt[0,i], z = signal.lfilter(b, 1, [x], zi=z)
data_filt = np.append(data_filt, buffer_filt, axis=1) # appends buffered filtered data to variable data_filt
return 0 # Absolutely needed for this callback to be well defined (see nidaqmx doc).
task.register_every_n_samples_acquired_into_buffer_event(bufsizeb, reading_task_callback) # bufsizeb instead
task.start()
while running: # this is perfect: it "stops" the console just like sleep in a way that the task does not stop
plt.clf()
plt.plot(buffer.T)
plt.draw()
plt.pause(0.01) # 100 Hz refresh rate
# plt.close(fig) # maybe no need to close it for now
# task.join() # this is for threads I guess ... (seems useless to my case?)
# Some prints at the end ...
print('Total number of acquired samples:', len(data.T))
print('Sampling frequency:', sfreq, 'Hz')
print('Buffer size:', bufsize)
print('Acquisition duration:', len(data.T)/sfreq, 's')
if __name__ == '__main__':
main()
Note that I do not need a task.stop()
after all because the way continuous acquisition tasks work with this package is that reading any line of code after task.start()
which is not a sleep
or something like that makes the task stop (well that's my understanding at least).
The first thing I did was get rid of the keyboard interrupt loop. I replaced it with a global variable running
, and another thread that sets the variable to False
when returned from.
def askUser():
global running
input("Press return to stop.")
running = False
Then, before the while loop
, created a new thread that will execute this function.
askUserThread = threading.Thread(target=askUser)
askUserThread.start()
And for the while loop, getting rid of the try
catch
statement:
while running:
plt.clf()
plt.plot(buffer.T)
plt.draw() # Note: this got changed because .show wasn't working.
plt.pause(0.01)
This still didn't work for me because I had to close the plot window for a new one to show up. So from this answer, I changed it from .show
to .draw
.
My end code was a little different (since I sampled random data) but here it is.
# sampling.py
# by Preston Hager
import matplotlib.pyplot as plt
import numpy as np
import threading
sfreq = 1000
bufsize = 100
running = True
data = np.zeros((1, 0)) # initializing an empty numpy array for my total data
buffer = np.zeros((1, bufsize)) # defined so that global buffer can be written to by the callback
def askUser():
global running
input("Press return to stop.")
running = False
def readingTask():
global data
global buffer
while running:
buffer = np.random.rand(1, bufsize)
# This is the reading part
data = np.append(data, buffer, axis=1) # appends buffered data to variable data
def main():
global running
print('Acquiring sensor data.')
thread = threading.Thread(target=askUser)
thread.start()
task = threading.Thread(target=readingTask)
task.start()
fig = plt.figure()
while running:
# Poor's man plot updating
plt.clf()
plt.plot(buffer.T)
plt.draw()
plt.pause(0.01) # 100 Hz refresh rate
plt.close(fig)
task.join()
# Some prints at the end ... nevermind
print('Total number of acquired samples:', len(data.T))
print('Sampling frequency:', sfreq, 'Hz')
print('Buffer size:', bufsize)
print('Acquisition duration:', len(data.T)/sfreq, 's')
if __name__ == '__main__':
main()