Search code examples
pythonmultithreadingraspberry-piadc

Multithreading with Specified Condition in Python


I am currently working on a Raspberry Pi project where I am using an analog to digital converter (ADC), and in my code, I would like the ADC to receive information continuously and fill a list with the data it receives on one thread while another segment of code is running on a different thread. The second thread is running a function that checks for the detection of a magnet, once this magnet has been detected X number of times (X is a value defined by the user), I would like the code to stop the thread running the ADC and plot the data that the ADC has received. Once the plot has been shown, this cycle will repeat continuously until the user exits out of the program.

In my attempt at this, I created two functions within a class titled halleffectandadc, within this class there are three functions, of these four functions I would like adcreceiver() and halleffect_monitor() to run on two separate threads. As I mentioned above, I would like the adcreceiver() and the halleffect_monitor() functions to run concurrently. I have been able to run them at the same time, the issue arises when I would like to make there be a condition where if the halleffect_monitor() detects the desired amount of clock cycles it forces the adcreceiver() thread to stop regardless of how much of its list it has filled with data and plot the data using the plot_data function. I'm not quite sure how to implement this, and I thought using multithreading would be the best approach, I am just not sure how to implement conditions with threading. If there is another approach that may be more efficient I would love to hear it.

import spidev
import time
import numpy as np
import matplotlib.pyplot as plt
import RPi.GPIO as GPIO
import threading 

class halleffectandadc:
    def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
        self.spi = spidev.SpiDev()
        self.spi.open(0, 0)
        self.spi.max_speed_hz = clock_speed
        self.buff_size = buff_size
        self.desired_clock_cycles = desired_clock_cycles
        GPIO.setmode(GPIO.BCM)
        self.pin = pin
        GPIO.setup(pin, GPIO.IN)
        self.buff = np.ndarray((self.buff_size,), dtype="uint16")

    def get_adc_val(self):
        response = self.spi.xfer([0x00, 0x00])
        return (response[0] << 8) + response[1]

    def adcreceiver(self):
        

        self.time_start = time.perf_counter()

        i = 0

        while i < self.buff_size:
            self.buff[i] = self.get_adc_val()
            i += 1
            print(i)
            if self.event.is_set():
                    print("The thread has been stopped to process data")
                    break
        self.time_end = time.perf_counter()

    def halleffect_monitor(self):
        hall_effect_clock_count = 0
        while hall_effect_clock_count < self.desired_clock_cycles:
                print("Waiting....")
                if GPIO.wait_for_edge(self.pin, GPIO.RISING):
                        hall_effect_clock_count += 1
                        print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}")       
                             
    def plot_data(self):

        elapsed_time = self.time_end - self.time_start

        time_points = np.linspace(0, elapsed_time, self.buff_size, endpoint=True)
        self.buff[0] = self.buff[1]
        self.buff[1] = self.buff[2]
        plt.plot(time_points, self.buff / 65535 * 5)

        plt.xlabel("Elapsed Time (s)", fontsize=12)
        plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
        plt.ylabel("Voltage (V)", fontsize=12)
        plt.show()
        print(
            f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
        )

try:
    adc1 = halleffectandadc(1, 17, 22000000, 100000)
    Thread1 = threading.Thread(target=adc1.halleffect_monitor)
    Thread2 = threading.Thread(target=adc1.adcreceiver, args=(event,))
    Thread1.start()
    Thread2.start()
    
except KeyboardInterrupt:
    spi.close()



Solution

  • Update

    The code (untested for obvious reasons) has been redone to use child processes rather than threads because I was having difficulty with handling KeyboardInterrupt exception events when I had threads waiting on threading.Event instances. Perhaps somebody can explain why. Anyway, using processes can potentially improve performance since each process has its own GIL. But this requires now for the numpy array, self.buff and new attribute self.elapsed_time to be stored in shared memory so that all processes can share this data.


    If I understand your question, you want to repeatedly gather data and plot it periodically. We need careful synchronization between the two threads and for this we will need several events:

    1. self.start_event - This event is awaited by adcreceiver in order to start receiving data.
    2. self.started_event - This event is set by adcreceiver to let halleffect_monitor know that it has started receiving data.
    3. self.stop_event - This event is set by halleffect_monitor to tell adcreceiver to stop processing in case it is still receiving data.
    4. self.stopped_event - This event is set by adcreceiver to let halleffect_monitor know that it has stopped processing.
    5. start_plotting_event - Tell the main thread to start plotting.
    6. self.plotting_completed_event - Set by the main thread to let halleffect_monitor that plotting has been completed.

    This seems like a lot of events, but I believe they are required so that events can be cleared for the next receiving/plotting cycle without worrying about any possible race conditions.

    from multiprocessing import Process, Event, Array, Value
    import numpy as np
    import time
    
    def np_array_to_shared_array(np_array):
        shared_array = Array('B', np_array.nbytes, lock=False)
        arr = np.frombuffer(shared_array, np_array.dtype)
        arr[:] = np_array.flatten(order='C')
        return shared_array
    
    def shared_array_to_np_array(shared_array, shape, dtype):
        return np.ndarray(shape, dtype=dtype, buffer=shared_array)
    
    class halleffectandadc:
        def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
            self.desired_clock_cycles = desired_clock_cycles
            self.pin = pin
            self.clock_speed = clock_speed
            self.buff_size = buff_size
    
            # Create np array in shared memory:
            arr = np.ndarray((buff_size,), dtype="uint16")
            shared_array = np_array_to_shared_array(arr)
            self.buff = shared_array_to_np_array(shared_array, arr.shape, arr.dtype)
    
            self.elapsed_time = Value('f', lock=False)  # Shared memory floating point
    
            self.start_event = Event()
            self.started_event = Event()
            self.stop_event = Event()
            self.stopped_event = Event()
            self.start_plotting_event = Event()
            self.plotting_completed_event = Event()
    
        def get_adc_val(self):
            response = self.spi.xfer([0x00, 0x00])
            return (response[0] << 8) + response[1]
    
        def adcreceiver(self):
            self.spi = spidev.SpiDev()
            self.spi.open(0, 0)
            self.spi.max_speed_hz = self.clock_speed
    
            try:
                while True:
                    # Wait for start event:
                    self.start_event.wait()
    
                    self.start_event.clear() # clear for next time
                    self.started_event.set() # show we have started
    
                    print("The process is starting to collect data", flush=True)
                    time_start = time.perf_counter()
    
                    i = 0
                    while i < self.buff_size and not self.stop_event.is_set():
                        self.buff[i] = self.get_adc_val()
                        i += 1
                        print(i, flush=True)
    
                    self.elapsed_time.value = time.perf_counter() - time_start
    
                    self.stopped_event.set() # show we have stopped
                    print("The process has been stopped to plot the data", flush=True)
            except KeyboardInterrupt:
                self.spi.close()
                print('adcreceiver terminating', flush=True)
            except Exception as e:
                print(e, flush=True)
    
    
        def halleffect_monitor(self):
            GPIO.setmode(GPIO.BCM)
            GPIO.setup(pin, GPIO.IN)
    
            try:
                while True:
                    # Zero out buffer:
                    self.buff.fill(0)
    
                    # clear events related to stopping
                    self.stop_event.clear()
                    self.stopped_event.clear()
    
                    # start the adcreceiver thread:
                    self.start_event.set()
                    # wait for adcreceiver to start:
                    self.started_event.wait()
                    # at this point the self.start_event has been cleared by adcreceiver
                    # so if adcreceiver stops because its buffer has been filled it will
                    # block until self.start_event is set again
    
                    hall_effect_clock_count = 0
                    while hall_effect_clock_count < self.desired_clock_cycles:
                        print("Waiting....", flush=True)
                        if GPIO.wait_for_edge(self.pin, GPIO.RISING):
                            hall_effect_clock_count += 1
                            print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}", flush=True)
    
                    # stop the adcreceiver thread if it hasn't already stopped:
                    self.stop_event.set()
                    # wait for the adcreceiver thread to have stopped:
                    self.stopped_event.wait()
    
                    # plot the data:
                    self.plotting_completed_event.clear()
                    self.start_plotting_event.set()
                    self.plotting_completed_event.wait()
            except KeyboardInterrupt:
                print('halleffect_monitor terminating', flush=True)
            except Exception as e:
                print(e, flush=True)
    
        def plot_data(self):
            time_points = np.linspace(0, self.elapsed_time.value, self.buff_size, endpoint=True)
            self.buff[0] = self.buff[1]
            self.buff[1] = self.buff[2]
            plt.plot(time_points, self.buff / 65535 * 5)
    
            plt.xlabel("Elapsed Time (s)", fontsize=12)
            plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
            plt.ylabel("Voltage (V)", fontsize=12)
            plt.show()
            print(
                f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
            )
    
    def main():
        try:
            adc1 = halleffectandadc(1, 17, 22000000, 100000)
            p1 = Process(target=adc1.halleffect_monitor)
            p2 = Process(target=adc1.adcreceiver)
            p1.start()
            p2.start()
    
            print('Type Ctrl-c to end up ...')
    
            while True:
                adc1.start_plotting_event.wait()
                adc1.start_plotting_event.clear()
                adc1.plot_data()
                adc1.plotting_completed_event.set()
        except KeyboardInterrupt:
            pass
    
        print('Waiting for processes to finish up ...')
        p1.join()
        p2.join()
    
    if __name__ == '__main__':
        main()
    

    Multithreading Version

    I believe I figured out a resolution to the problem with receiving KeyboardInterrupt exceptions when waiting on a threading.Event instance, which is to use instead a multiprocessing.Event instance.

    import threading
    import multiprocessing
    import numpy as np
    import time
    
    class halleffectandadc:
        def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
            self.start_event = threading.Event()
            self.started_event = threading.Event()
            self.stop_event = threading.Event()
            self.stopped_event = threading.Event()
            # Note the use of a multiprocessing.Event:
            self.start_plotting_event = multiprocessing.Event()
            self.plotting_completed_event = threading.Event()
    
            self.spi = spidev.SpiDev()
            self.spi.open(0, 0)
            self.spi.max_speed_hz = clock_speed
            self.buff_size = buff_size
            self.buff = np.ndarray((self.buff_size,), dtype="uint16")
            self.desired_clock_cycles = desired_clock_cycles
            self.pin = pin
            GPIO.setmode(GPIO.BCM)
            GPIO.setup(pin, GPIO.IN)
    
        def get_adc_val(self):
            response = self.spi.xfer([0x00, 0x00])
            return (response[0] << 8) + response[1]
    
        def adcreceiver(self):
            while True:
                # Wait for start event:
                self.start_event.wait()
                self.start_event.clear() # clear for next time
                self.started_event.set() # show we have started
    
                print("The thread is starting to collect data")
                self.time_start = time.perf_counter()
    
                i = 0
                while i < self.buff_size and not self.stop_event.is_set():
                    self.buff[i] = self.get_adc_val()
                    i += 1
                    print(i)
    
                self.time_end = time.perf_counter()
    
                self.stopped_event.set() # show we have stopped
                print("The thread has been stopped to plot the data")
    
    
        def halleffect_monitor(self):
            while True:
                # start with an empty buffer for each new cycle:
                self.buff.fill(0)
    
                # clear events related to stopping
                self.stop_event.clear()
                self.stopped_event.clear()
    
                # start the adcreceiver thread:
                self.start_event.set()
                # wait for adcreceiver to start:
                self.started_event.wait()
                # at this point the self.start_event has been cleared by adcreceiver
                # so if adcreceiver stops because its buffer has been filled it will
                # block until self.start_event is set again
    
                hall_effect_clock_count = 0
                while hall_effect_clock_count < self.desired_clock_cycles:
                    print("Waiting....")
                    if GPIO.wait_for_edge(self.pin, GPIO.RISING):
                        hall_effect_clock_count += 1
                        print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}")
    
                # stop the adcreceiver thread if it hasn't already stopped:
                self.stop_event.set()
                # wait for the adcreceiver thread to have stopped:
                self.stopped_event.wait()
    
                # plot the data:
                self.plotting_completed_event.clear()
                self.start_plotting_event.set()
                self.plotting_completed_event.wait()
    
        def plot_data(self):
            elapsed_time = self.time_end - self.time_start
    
            time_points = np.linspace(0, elapsed_time, self.buff_size, endpoint=True)
            self.buff[0] = self.buff[1]
            self.buff[1] = self.buff[2]
            plt.plot(time_points, self.buff / 65535 * 5)
    
            plt.xlabel("Elapsed Time (s)", fontsize=12)
            plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
            plt.ylabel("Voltage (V)", fontsize=12)
            plt.show()
            print(
                f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
            )
    
    try:
        adc1 = halleffectandadc(1, 17, 22000000, 100000)
        # Make these daemon threads:
        threading.Thread(target=adc1.halleffect_monitor, daemon=True).start()
        threading.Thread(target=adc1.adcreceiver, daemon=True).start()
        print('Hit Ctrl-C to terminate ...\n')
        while True:
            adc1.start_plotting_event.wait()
            adc1.start_plotting_event.clear()
            adc1.plot_data()
            adc1.plotting_completed_event.set()
    except KeyboardInterrupt:
        adc1.spi.close()