Search code examples
transitioncovariancekalman-filterpykalman

Kalman Filter (pykalman): Value for obs_covariance and model without intercept


I am looking at the KalmanFilter from pykalman shown in examples:

pykalman documentation

Example 1

Example 2

and I am wondering

observation_covariance=100,

vs

observation_covariance=1,

the documentation states

observation_covariance R: e(t)^2 ~ Gaussian (0, R)

How should the value be set here correctly?

Additionally, is it possible to apply the Kalman filter without intercept in the above module?


Solution

  • The observation covariance shows how much error you assume to be in your input data. Kalman filter works fine on normally distributed data. Under this assumption you can use the 3-Sigma rule to calculate the covariance (in this case the variance) of your observation based on the maximum error in the observation.

    The values in your question can be interpreted as follows:

    Example 1

    observation_covariance = 100
    sigma = sqrt(observation_covariance) = 10
    max_error = 3*sigma = 30
    

    Example 2

    observation_covariance = 1
    sigma = sqrt(observation_covariance) = 1
    max_error = 3*sigma = 3
    

    So you need to choose the value based on your observation data. The more accurate the observation, the smaller the observation covariance.

    Another point: you can tune your filter by manipulating the covariance, but I think it's not a good idea. The higher the observation covariance value the weaker impact a new observation has on the filter state.

    Sorry, I did not understand the second part of your question (about the Kalman Filter without intercept). Could you please explain what you mean? You are trying to use a regression model and both intercept and slope belong to it.

    ---------------------------

    UPDATE

    I prepared some code and plots to answer your questions in details. I used EWC and EWA historical data to stay close to the original article.

    First of all here is the code (pretty the same one as in the examples above but with a different notation)

    from pykalman import KalmanFilter
    import numpy as np
    import matplotlib.pyplot as plt
    
    # reading data (quick and dirty)
    Datum=[]
    EWA=[]
    EWC=[]
    
    for line in open('data/dataset.csv'):
        f1, f2, f3  = line.split(';')
        Datum.append(f1)
        EWA.append(float(f2))
        EWC.append(float(f3))
    
    n = len(Datum)
    
    # Filter Configuration
    
    # both slope and intercept have to be estimated
    
    # transition_matrix  
    F = np.eye(2) # identity matrix because x_(k+1) = x_(k) + noise
    
    # observation_matrix  
    # H_k = [EWA_k   1]
    H = np.vstack([np.matrix(EWA), np.ones((1, n))]).T[:, np.newaxis]
    
    # transition_covariance 
    Q = [[1e-4,     0], 
         [   0,  1e-4]] 
    
    # observation_covariance 
    R = 1 # max error = 3
    
    # initial_state_mean
    X0 = [0,
          0]
    
    # initial_state_covariance
    P0 = [[  1,    0], 
          [  0,    1]]
    
    # Kalman-Filter initialization
    kf = KalmanFilter(n_dim_obs=1, n_dim_state=2,
                      transition_matrices = F, 
                      observation_matrices = H, 
                      transition_covariance = Q, 
                      observation_covariance = R, 
                      initial_state_mean = X0, 
                      initial_state_covariance = P0)
    
    # Filtering
    state_means, state_covs = kf.filter(EWC)
    
    # Restore EWC based on EWA and estimated parameters
    EWC_restored = np.multiply(EWA, state_means[:, 0]) + state_means[:, 1]    
    
    # Plots
    plt.figure(1)
    ax1 = plt.subplot(211)
    plt.plot(state_means[:, 0], label="Slope")
    plt.grid()
    plt.legend(loc="upper left")
    
    ax2 = plt.subplot(212)
    plt.plot(state_means[:, 1], label="Intercept")
    plt.grid()
    plt.legend(loc="upper left")
    
    # check the result
    plt.figure(2)
    plt.plot(EWC, label="EWC original")
    plt.plot(EWC_restored, label="EWC restored")
    plt.grid()
    plt.legend(loc="upper left")
    
    plt.show()
    

    I could not retrieve data using pandas, so I downloaded them and read from the file.

    Here you can see the estimated slope and intercept:

    Linear regression using Kalman Filter. Both Slope and Intercept estimated

    To test the estimated data I restored the EWC value from the EWA using the estimated parameters:

    Original and restored function using estimated slope and interception

    About the observation covariance value

    By varying the observation covariance value you tell the Filter how accurate the input data is (normally you just describe your confidence in the observation using some datasheets or your knowledge about the system).

    Here are estimated parameters and the restored EWC values using different observation covariance values:

    slope and intercept using different observation covariance values

    restored function using different observation covariance values

    You can see the filter follows the original function better with a bigger confidence in observation (smaller R). If the confidence is low (bigger R) the filter leaves the initial estimate (slope = 0, intercept = 0) very slowly and the restored function is far away from the original one.

    About the frozen intercept

    If you want to freeze the intercept for some reason, you need to change the whole model and all filter parameters.

    In the normal case we had:

    x = [slope; intercept]    #estimation state
    H = [EWA    1]            #observation matrix
    z = [EWC]                 #observation
    

    Now we have:

    x = [slope]               #estimation state
    H = [EWA]                 #observation matrix
    z = [EWC-const_intercept] #observation
    

    Results:

    estimation with the constant intercept

    restored function using the constant intercept

    Here is the code:

    from pykalman import KalmanFilter
    import numpy as np
    import matplotlib.pyplot as plt
    
    # only slope has to be estimated (it will be manipulated by the constant intercept) - mathematically incorrect!
    const_intercept = 10
    
    # reading data (quick and dirty)
    Datum=[]
    EWA=[]
    EWC=[]
    
    for line in open('data/dataset.csv'):
        f1, f2, f3  = line.split(';')
        Datum.append(f1)
        EWA.append(float(f2))
        EWC.append(float(f3))
    
    n = len(Datum)
    
    # Filter Configuration
    
    # transition_matrix  
    F = 1 # identity matrix because x_(k+1) = x_(k) + noise
    
    # observation_matrix  
    # H_k = [EWA_k]
    H = np.matrix(EWA).T[:, np.newaxis]
    
    # transition_covariance 
    Q = 1e-4 
    
    # observation_covariance 
    R = 1 # max error = 3
    
    # initial_state_mean
    X0 = 0
    
    # initial_state_covariance
    P0 = 1    
    
    # Kalman-Filter initialization
    kf = KalmanFilter(n_dim_obs=1, n_dim_state=1,
                      transition_matrices = F, 
                      observation_matrices = H, 
                      transition_covariance = Q, 
                      observation_covariance = R, 
                      initial_state_mean = X0, 
                      initial_state_covariance = P0)
    
    # Creating the observation based on EWC and the constant intercept
    z = EWC[:] # copy the list (not just assign the reference!)
    z[:] = [x - const_intercept for x in z]
    
    # Filtering
    state_means, state_covs = kf.filter(z) # the estimation for the EWC data minus constant intercept
    
    # Restore EWC based on EWA and estimated parameters
    EWC_restored = np.multiply(EWA, state_means[:, 0]) + const_intercept
    
    # Plots
    plt.figure(1)
    ax1 = plt.subplot(211)
    plt.plot(state_means[:, 0], label="Slope")
    plt.grid()
    plt.legend(loc="upper left")
    
    ax2 = plt.subplot(212)
    plt.plot(const_intercept*np.ones((n, 1)), label="Intercept")
    plt.grid()
    plt.legend(loc="upper left")
    
    # check the result
    plt.figure(2)
    plt.plot(EWC, label="EWC original")
    plt.plot(EWC_restored, label="EWC restored")
    plt.grid()
    plt.legend(loc="upper left")
    
    plt.show()