Search code examples
pythonaverage

Real time Moving Averages in Python


I need to calculate moving average of a sensor data that is coming on on the serial port with Python. All the samples I can find about numpy use data from a file or hard coded data in an array before the program starts.

In my case I do not have any data when the program starts. The data comes in real time in every second. I want to smooth the data as it arrives on the serial port.

I have this working on the Arduino but also need it in Python. Can somebody please point me to a real time (single value over time) sample not a batch sample.


Solution

  • Here's how you would add one reading at a time to a running collection of readings and return the average. I prepopulated the readings list to show it in action, but in your program, you'd just start off with an empty list: readings = []

    I made the assumption that you want to include the last x readings in your average rather than including all of the readings. That is what the max_samples parameter is for.

    without numpy:

    readings = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    reading = 10
    max_samples = 10
    
    def mean(nums):
        return float(sum(nums)) / max(len(nums), 1)
    
    readings.append(reading)
    avg = mean(readings)
    
    print 'current average =', avg
    print 'readings used for average:', readings
    
    if len(readings) == max_samples:
        readings.pop(0)
    
    print 'readings saved for next time:', readings
    

    result:

    current average = 5.5
    readings used for average: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    readings saved for next time: [2, 3, 4, 5, 6, 7, 8, 9, 10]
    

    with numpy:

    import numpy as np
    
    readings = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
    reading = 10
    max_samples = 10
    
    readings = np.append(readings, reading)
    avg = np.mean(readings)
    
    print 'current average =', avg
    print 'readings used for average:', readings
    
    if len(readings) == max_samples:
        readings = np.delete(readings, 0)
    
    print 'readings saved for next time:', readings
    

    result:

    current average = 5.5
    readings used for average: [ 1  2  3  4  5  6  7  8  9 10]
    readings saved for next time: [ 2  3  4  5  6  7  8  9 10]