Search code examples
pythonmachine-learningkalman-filter

Implementing Kalman filter in Python - are these 5 equations implemented correctly?


There's an assignment about implementing Kalman-filter 1D.

The goal is to make the blue square hit the pink square.

Blue square gets data about the position of the pink square, but the data is not precise, it's noisy. The noisy data has standard deviation of 300 pixels.

The blue square uses Kalman-filter algorithm to get the precise position and hit the pink square.

Only "Kalman class" is allowed to edit, and I have implemented Kalman-filter algorithm in that class.

I used the 5 equations described, but it still doesn't work properly.

Here's the Kalman class where Kalman filter is implemented (and below that is the whole code):

class Kalman():
    def __init__(self):
        self._dt = 1  # time step
        self._pos = 785 # position
        self._vel = 1 # velocity
        self._acc = 0.05 # acceleration
        self._pos_est_err = 1000000 # position estimate uncertainty
        self._vel_est_err = 1000000 # velocity estimate uncertainty
        self._acc_est_err = 2000 # acceleration estimate uncertainty
        self._pos_mea_err = 300
        self._vel_mea_err = 100
        self._acc_mea_err = 4
        self._alpha = 1 # converges to 0.32819526927045667
        self._beta = 1 # converges to 0.18099751242241782
        self._gamma = 0.01 # the acceleration doesn't change much
        
        self._q = 30 # process uncerrainty for position
        self._q_v = 4 # process uncertainty for velocity
        self._q_a = 2 # process uncertainty for acceleration
        
        self._last_pos = 785
        self._measured_velocity = 0


    def calc_next(self, zi):
        
        self._measured_velocity = zi - self._last_pos
        self._last_pos = zi
        
        #State extrapolation
        self._pos = self._pos + self._vel*self._dt + 0.5*self._acc*self._dt**2
        self._vel = self._vel + self._acc*self._dt
        self._acc = self._acc
        
        #Covariance extrapolation
        self._pos_est_err = self._pos_est_err + self._dt**2 * self._vel_est_err + self._q
        self._vel_est_err = self._vel_est_err + self._q_v
        self._acc_est_err = self._acc_est_err + self._q_a
        
        #Alhpa-beta-gamma update
        self._alpha = self._pos_est_err / (self._pos_est_err + self._pos_mea_err)
        self._beta = self._vel_est_err / (self._vel_est_err + self._vel_mea_err)
        #self._gamma = self._acc_est_err / (self._acc_est_err + self._acc_mea_err)
        
        #State update
        pos_prev = self._pos
        self._pos = pos_prev + self._alpha*(zi - pos_prev)
        self._vel = self._vel + self._beta*((zi - pos_prev)/self._dt)
        self._acc = self._acc + self._gamma*((zi - pos_prev) / (0.5*self._dt**2))
        
        #Covariance update
        self._pos_est_err = (1-self._alpha)*self._pos_est_err
        self._vel_est_err = (1-self._beta)*self._vel_est_err
        self._acc_est_err = (1-self._gamma)*self._acc_est_err
        
        return self._pos

I used "Alpha-beta and gamma" values as Kalman gains, since the object (the pink square) is moving and changing its direction.

But how do I write equations for Covariance Extrapolation Equations and Covariance Update Equations?

I'm not sure if I have written them correctly.

THE WHOLE CODE

import pygame as pg
from random import random,randint
import numpy as np
from numpy.linalg import norm

fps = 0.0

class Projectile():

    def __init__(self, background, kalman=None):
        self.background = background
        self.rect = pg.Rect((800,700),(16,16))
        self.px = self.rect.x
        self.py = self.rect.y
        self.dx = 0.0
        self.kalm = kalman

    def move(self,goal):

        if self.kalm:
            goal = self.kalm.calc_next(goal)
        deltax = np.array(float(goal) - self.px)
        #print(delta2)
        mag_delta = norm(deltax)#* 500.0
        np.divide(deltax,mag_delta,deltax)

        self.dx += deltax
        #if self.dx:
            #self.dx /= norm(self.dx) * 50

        self.px += self.dx /50.0
        self.py += -0.5
        try:
            self.rect.x = int(self.px)
        except:
            pass
        try:
            self.rect.y = int(self.py)
        except:
            pass


class Target():

    def __init__(self, background, width):
        self.background = background
        self.rect = pg.Rect(self.background.get_width()//2-width//2, 
            50, width, 32)
        self.dx = 1 if random() > 0.5 else -1

    def move(self):
        self.rect.x += self.dx

        if self.rect.x < 300 or self.rect.x > self.background.get_width()-300:
            self.dx *= -1

    def noisy_x_pos(self):
        pos = self.rect.x
        center = self.rect.width//2
        noise = np.random.normal(0,1,1)[0]

        return pos + center + noise*300.0


class Kalman():
    def __init__(self):
        self._dt = 1  # time step
        self._pos = 785
        self._vel = 1
        self._acc = 0.05
        self._pos_est_err = 1000000
        self._vel_est_err = 1000000
        self._acc_est_err = 2000
        self._pos_mea_err = 300
        self._vel_mea_err = 100
        self._acc_mea_err = 4
        self._alpha = 1 # converges to 0.32819526927045667
        self._beta = 1 # converges to 0.18099751242241782
        self._gamma = 0.01 # the acceleration doesn't change much
        
        self._q = 30 # process uncerrainty for position
        self._q_v = 4 # process uncertainty for velocity
        self._q_a = 2 # process uncertainty for acceleration
        
        self._last_pos = 785
        self._measured_velocity = 0


    def calc_next(self, zi):
        
        self._measured_velocity = zi - self._last_pos
        self._last_pos = zi
        
        #State extrapolation
        self._pos = self._pos + self._vel*self._dt + 0.5*self._acc*self._dt**2
        self._vel = self._vel + self._acc*self._dt
        self._acc = self._acc
        
        #Covariance extrapolation
        self._pos_est_err = self._pos_est_err + self._dt**2 * self._vel_est_err + self._q
        self._vel_est_err = self._vel_est_err + self._q_v
        self._acc_est_err = self._acc_est_err + self._q_a
        
        #Alhpa-beta-gamma update
        self._alpha = self._pos_est_err / (self._pos_est_err + self._pos_mea_err)
        self._beta = self._vel_est_err / (self._vel_est_err + self._vel_mea_err)
        #self._gamma = self._acc_est_err / (self._acc_est_err + self._acc_mea_err)
        
        #State update
        pos_prev = self._pos
        self._pos = pos_prev + self._alpha*(zi - pos_prev)
        self._vel = self._vel + self._beta*((zi - pos_prev)/self._dt)
        self._acc = self._acc + self._gamma*((zi - pos_prev) / (0.5*self._dt**2))
        
        #Covariance update
        self._pos_est_err = (1-self._alpha)*self._pos_est_err
        self._vel_est_err = (1-self._beta)*self._vel_est_err
        self._acc_est_err = (1-self._gamma)*self._acc_est_err
        
        return self._pos


pg.init()

w,h = 1600,800

background = pg.display.set_mode((w,h))
surf = pg.surfarray.pixels3d(background)
running = True
clock = pg.time.Clock()

kalman_score = 0
reg_score = 0
iters = 0
count = 0

while running:
    target = Target(background, 32)
    missile = Projectile(background)
    k_miss = Projectile(background,Kalman())
    last_x_pos = target.noisy_x_pos
    noisy_draw = np.zeros((w,20))

    trial = True
    iters += 1

    while trial:

        # Setter en maksimal framerate på 300. Hvis dere vil øke denne er dette en mulig endring
        clock.tick(30000)
        fps = clock.get_fps()
        
        for e in pg.event.get():
            if e.type == pg.QUIT:
                running = False
        
        background.fill(0x448844)
        surf[:,0:20,0] = noisy_draw

        last_x_pos = target.noisy_x_pos()
        # print(last_x_pos)
        
        target.move()
        missile.move(last_x_pos)
        k_miss.move(last_x_pos) 

        pg.draw.rect(background, (255, 200, 0), missile.rect)
        pg.draw.rect(background, (0, 200, 255), k_miss.rect) 
        pg.draw.rect(background, (255, 200, 255), target.rect)

        noisy_draw[int(last_x_pos):int(last_x_pos)+20,:] = 255
        noisy_draw -= 1
        np.clip(noisy_draw, 0, 255, noisy_draw)

        coll = missile.rect.colliderect(target.rect)
        k_coll = k_miss.rect.colliderect(target.rect)

        if coll:
            reg_score += 1

        if k_coll:    
            kalman_score += 1

        oob = missile.rect.y < 20

        if oob or coll or k_coll:
            trial = False

        pg.display.flip()

    print('kalman score: ', round(kalman_score/iters,2))
    print('regular score: ', round(reg_score/iters,2))
    count += 1
    print("Nr. of tries:", count)

pg.quit()

Solution

  • Your covariance update appears correct, but the formula for the position estimate has an error. Consider that your process model is the linear transformation xk = Fk xk-1 (ignoring the noise terms), where xk is a column vector containing self._pos, self._vel, and self._acc, and Fk is the following matrix:

    1  ∆t  ½∆t²
    0   1   ∆t
    0   0   1
    

    where ∆t is self._dt in your code. The formula for the updated covariance matrix Pk|k-1, which in your case is a diagonal matrix, is Pk|k-1 = Fk Pk-1|k-1 FkT + Rk, where Pk-1|k-1 is the previous covariance matrix and Rk is the diagonal covariance matrix reflecting measurement error (i.e., self._pos_mea_err, self._vel_mea_err, and self._acc_mea_err). When you do the matrix multiplications, all the ∆t terms in Fk zero-out because they are not on the diagonal. There should be no ∆t terms in your covariance extrapolation.