I am currently working on a Raspberry Pi project where I am using an analog to digital converter (ADC), and in my code, I would like the ADC to receive information continuously and fill a list with the data it receives on one thread while another segment of code is running on a different thread. The second thread is running a function that checks for the detection of a magnet, once this magnet has been detected X number of times (X is a value defined by the user), I would like the code to stop the thread running the ADC and plot the data that the ADC has received. Once the plot has been shown, this cycle will repeat continuously until the user exits out of the program.
In my attempt at this, I created two functions within a class titled halleffectandadc, within this class there are three functions, of these four functions I would like adcreceiver() and halleffect_monitor() to run on two separate threads. As I mentioned above, I would like the adcreceiver() and the halleffect_monitor() functions to run concurrently. I have been able to run them at the same time, the issue arises when I would like to make there be a condition where if the halleffect_monitor() detects the desired amount of clock cycles it forces the adcreceiver() thread to stop regardless of how much of its list it has filled with data and plot the data using the plot_data function. I'm not quite sure how to implement this, and I thought using multithreading would be the best approach, I am just not sure how to implement conditions with threading. If there is another approach that may be more efficient I would love to hear it.
import spidev
import time
import numpy as np
import matplotlib.pyplot as plt
import RPi.GPIO as GPIO
import threading
class halleffectandadc:
def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
self.spi = spidev.SpiDev()
self.spi.open(0, 0)
self.spi.max_speed_hz = clock_speed
self.buff_size = buff_size
self.desired_clock_cycles = desired_clock_cycles
GPIO.setmode(GPIO.BCM)
self.pin = pin
GPIO.setup(pin, GPIO.IN)
self.buff = np.ndarray((self.buff_size,), dtype="uint16")
def get_adc_val(self):
response = self.spi.xfer([0x00, 0x00])
return (response[0] << 8) + response[1]
def adcreceiver(self):
self.time_start = time.perf_counter()
i = 0
while i < self.buff_size:
self.buff[i] = self.get_adc_val()
i += 1
print(i)
if self.event.is_set():
print("The thread has been stopped to process data")
break
self.time_end = time.perf_counter()
def halleffect_monitor(self):
hall_effect_clock_count = 0
while hall_effect_clock_count < self.desired_clock_cycles:
print("Waiting....")
if GPIO.wait_for_edge(self.pin, GPIO.RISING):
hall_effect_clock_count += 1
print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}")
def plot_data(self):
elapsed_time = self.time_end - self.time_start
time_points = np.linspace(0, elapsed_time, self.buff_size, endpoint=True)
self.buff[0] = self.buff[1]
self.buff[1] = self.buff[2]
plt.plot(time_points, self.buff / 65535 * 5)
plt.xlabel("Elapsed Time (s)", fontsize=12)
plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
plt.ylabel("Voltage (V)", fontsize=12)
plt.show()
print(
f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
)
try:
adc1 = halleffectandadc(1, 17, 22000000, 100000)
Thread1 = threading.Thread(target=adc1.halleffect_monitor)
Thread2 = threading.Thread(target=adc1.adcreceiver, args=(event,))
Thread1.start()
Thread2.start()
except KeyboardInterrupt:
spi.close()
Update
The code (untested for obvious reasons) has been redone to use child processes rather than threads because I was having difficulty with handling KeyboardInterrupt exception events when I had threads waiting on threading.Event
instances. Perhaps somebody can explain why. Anyway, using processes can potentially improve performance since each process has its own GIL. But this requires now for the numpy
array, self.buff
and new attribute self.elapsed_time
to be stored in shared memory so that all processes can share this data.
If I understand your question, you want to repeatedly gather data and plot it periodically. We need careful synchronization between the two threads and for this we will need several events:
self.start_event
- This event is awaited by adcreceiver
in order to start receiving data.self.started_event
- This event is set by adcreceiver
to let halleffect_monitor
know that it has started receiving data.self.stop_event
- This event is set by halleffect_monitor
to tell adcreceiver
to stop processing in case it is still receiving data.self.stopped_event
- This event is set by adcreceiver
to let halleffect_monitor
know that it has stopped processing.start_plotting_event
- Tell the main thread to start plotting.self.plotting_completed_event
- Set by the main thread to let halleffect_monitor
that plotting has been completed.This seems like a lot of events, but I believe they are required so that events can be cleared for the next receiving/plotting cycle without worrying about any possible race conditions.
from multiprocessing import Process, Event, Array, Value
import numpy as np
import time
def np_array_to_shared_array(np_array):
shared_array = Array('B', np_array.nbytes, lock=False)
arr = np.frombuffer(shared_array, np_array.dtype)
arr[:] = np_array.flatten(order='C')
return shared_array
def shared_array_to_np_array(shared_array, shape, dtype):
return np.ndarray(shape, dtype=dtype, buffer=shared_array)
class halleffectandadc:
def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
self.desired_clock_cycles = desired_clock_cycles
self.pin = pin
self.clock_speed = clock_speed
self.buff_size = buff_size
# Create np array in shared memory:
arr = np.ndarray((buff_size,), dtype="uint16")
shared_array = np_array_to_shared_array(arr)
self.buff = shared_array_to_np_array(shared_array, arr.shape, arr.dtype)
self.elapsed_time = Value('f', lock=False) # Shared memory floating point
self.start_event = Event()
self.started_event = Event()
self.stop_event = Event()
self.stopped_event = Event()
self.start_plotting_event = Event()
self.plotting_completed_event = Event()
def get_adc_val(self):
response = self.spi.xfer([0x00, 0x00])
return (response[0] << 8) + response[1]
def adcreceiver(self):
self.spi = spidev.SpiDev()
self.spi.open(0, 0)
self.spi.max_speed_hz = self.clock_speed
try:
while True:
# Wait for start event:
self.start_event.wait()
self.start_event.clear() # clear for next time
self.started_event.set() # show we have started
print("The process is starting to collect data", flush=True)
time_start = time.perf_counter()
i = 0
while i < self.buff_size and not self.stop_event.is_set():
self.buff[i] = self.get_adc_val()
i += 1
print(i, flush=True)
self.elapsed_time.value = time.perf_counter() - time_start
self.stopped_event.set() # show we have stopped
print("The process has been stopped to plot the data", flush=True)
except KeyboardInterrupt:
self.spi.close()
print('adcreceiver terminating', flush=True)
except Exception as e:
print(e, flush=True)
def halleffect_monitor(self):
GPIO.setmode(GPIO.BCM)
GPIO.setup(pin, GPIO.IN)
try:
while True:
# Zero out buffer:
self.buff.fill(0)
# clear events related to stopping
self.stop_event.clear()
self.stopped_event.clear()
# start the adcreceiver thread:
self.start_event.set()
# wait for adcreceiver to start:
self.started_event.wait()
# at this point the self.start_event has been cleared by adcreceiver
# so if adcreceiver stops because its buffer has been filled it will
# block until self.start_event is set again
hall_effect_clock_count = 0
while hall_effect_clock_count < self.desired_clock_cycles:
print("Waiting....", flush=True)
if GPIO.wait_for_edge(self.pin, GPIO.RISING):
hall_effect_clock_count += 1
print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}", flush=True)
# stop the adcreceiver thread if it hasn't already stopped:
self.stop_event.set()
# wait for the adcreceiver thread to have stopped:
self.stopped_event.wait()
# plot the data:
self.plotting_completed_event.clear()
self.start_plotting_event.set()
self.plotting_completed_event.wait()
except KeyboardInterrupt:
print('halleffect_monitor terminating', flush=True)
except Exception as e:
print(e, flush=True)
def plot_data(self):
time_points = np.linspace(0, self.elapsed_time.value, self.buff_size, endpoint=True)
self.buff[0] = self.buff[1]
self.buff[1] = self.buff[2]
plt.plot(time_points, self.buff / 65535 * 5)
plt.xlabel("Elapsed Time (s)", fontsize=12)
plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
plt.ylabel("Voltage (V)", fontsize=12)
plt.show()
print(
f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
)
def main():
try:
adc1 = halleffectandadc(1, 17, 22000000, 100000)
p1 = Process(target=adc1.halleffect_monitor)
p2 = Process(target=adc1.adcreceiver)
p1.start()
p2.start()
print('Type Ctrl-c to end up ...')
while True:
adc1.start_plotting_event.wait()
adc1.start_plotting_event.clear()
adc1.plot_data()
adc1.plotting_completed_event.set()
except KeyboardInterrupt:
pass
print('Waiting for processes to finish up ...')
p1.join()
p2.join()
if __name__ == '__main__':
main()
Multithreading Version
I believe I figured out a resolution to the problem with receiving KeyboardInterrupt
exceptions when waiting on a threading.Event
instance, which is to use instead a multiprocessing.Event
instance.
import threading
import multiprocessing
import numpy as np
import time
class halleffectandadc:
def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
self.start_event = threading.Event()
self.started_event = threading.Event()
self.stop_event = threading.Event()
self.stopped_event = threading.Event()
# Note the use of a multiprocessing.Event:
self.start_plotting_event = multiprocessing.Event()
self.plotting_completed_event = threading.Event()
self.spi = spidev.SpiDev()
self.spi.open(0, 0)
self.spi.max_speed_hz = clock_speed
self.buff_size = buff_size
self.buff = np.ndarray((self.buff_size,), dtype="uint16")
self.desired_clock_cycles = desired_clock_cycles
self.pin = pin
GPIO.setmode(GPIO.BCM)
GPIO.setup(pin, GPIO.IN)
def get_adc_val(self):
response = self.spi.xfer([0x00, 0x00])
return (response[0] << 8) + response[1]
def adcreceiver(self):
while True:
# Wait for start event:
self.start_event.wait()
self.start_event.clear() # clear for next time
self.started_event.set() # show we have started
print("The thread is starting to collect data")
self.time_start = time.perf_counter()
i = 0
while i < self.buff_size and not self.stop_event.is_set():
self.buff[i] = self.get_adc_val()
i += 1
print(i)
self.time_end = time.perf_counter()
self.stopped_event.set() # show we have stopped
print("The thread has been stopped to plot the data")
def halleffect_monitor(self):
while True:
# start with an empty buffer for each new cycle:
self.buff.fill(0)
# clear events related to stopping
self.stop_event.clear()
self.stopped_event.clear()
# start the adcreceiver thread:
self.start_event.set()
# wait for adcreceiver to start:
self.started_event.wait()
# at this point the self.start_event has been cleared by adcreceiver
# so if adcreceiver stops because its buffer has been filled it will
# block until self.start_event is set again
hall_effect_clock_count = 0
while hall_effect_clock_count < self.desired_clock_cycles:
print("Waiting....")
if GPIO.wait_for_edge(self.pin, GPIO.RISING):
hall_effect_clock_count += 1
print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}")
# stop the adcreceiver thread if it hasn't already stopped:
self.stop_event.set()
# wait for the adcreceiver thread to have stopped:
self.stopped_event.wait()
# plot the data:
self.plotting_completed_event.clear()
self.start_plotting_event.set()
self.plotting_completed_event.wait()
def plot_data(self):
elapsed_time = self.time_end - self.time_start
time_points = np.linspace(0, elapsed_time, self.buff_size, endpoint=True)
self.buff[0] = self.buff[1]
self.buff[1] = self.buff[2]
plt.plot(time_points, self.buff / 65535 * 5)
plt.xlabel("Elapsed Time (s)", fontsize=12)
plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
plt.ylabel("Voltage (V)", fontsize=12)
plt.show()
print(
f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
)
try:
adc1 = halleffectandadc(1, 17, 22000000, 100000)
# Make these daemon threads:
threading.Thread(target=adc1.halleffect_monitor, daemon=True).start()
threading.Thread(target=adc1.adcreceiver, daemon=True).start()
print('Hit Ctrl-C to terminate ...\n')
while True:
adc1.start_plotting_event.wait()
adc1.start_plotting_event.clear()
adc1.plot_data()
adc1.plotting_completed_event.set()
except KeyboardInterrupt:
adc1.spi.close()