Search code examples
pythonmachine-learninglinear-regressiongradient-descent

My python implementation of Gradient Descent is not working well


I am trying to create a Linear regression model that uses batch gradient descent but the error or mse value never decreases. The LinearModel is just a template class that initializes the hyperparameters (step_size=0.001, max_iter=10000, eps=0.001, theta_0=None, verbose=True)

# The data is stored in an array of (m, n), where there are n #columns and m #rows
# x = array((m,n))
# y = array((m,1))

class LinearRegression(LinearModel):
  """Linear regression with Gradient Descent."""

  def fit(self, x, y):
    """Run Gradient Descent Method to minimize J(theta) for Linear Regression.

    :param x: Training example inputs. Shape (m, n).
    :param y: Training example labels. Shape (m,).
    """

    def squared_error(theta, x,y):
      # Initialize a variable to store the total error
      error = 0
        # Calculate the predicted value using the current parameter values
        # Calculate the difference between the predicted and actual values
        # Add the squared difference to the total error
      # Return the total error divided by the number of data points
      return np.mean((x.dot(theta) - y)**2)

    # Define a function to calculate the partial derivatives of the squared error with respect to each parameter
    def grad_squared_error(theta, x,y):
      m,n = x.shape
      # Initialize a list to store the partial derivatives
      grad = np.zeros(n)

      for j in range(n):
        # Update the partial derivatives using the chain rule
        grad[j] += ((x.dot(theta)) - y).dot(x[:,j][np.newaxis].T) #np.newaxis to make the numpy array 2D for Transpose

      # Return the partial derivatives divided by the number of (data points *2)
      return [g / (2*m) for g in grad]


    m,n = x.shape
    if self.theta == None:
      self.theta = np.zeros(n)


    decay_factor = 0.9  # Adjust this as needed
    decay_interval = 10  # Adjust this as needed

    for i in range(self.max_iter):
      # Calculate the partial derivatives of the squared error using the current parameter values
      grad = grad_squared_error(self.theta, x,y)

      # Update each parameter value using gradient descent formula
      for j in range(len(self.theta)):
        self.theta[j] -= self.step_size * grad[j]

      # Print the current parameter values and squared error
      if self.verbose and i % decay_interval == 0:
        self.step_size *= decay_factor
        print(f"Iteration {i+1}: theta = {self.theta}, squared error = {squared_error(self.theta, x,y)}")


      if squared_error(self.theta, x,y) < self.eps:
        if self.verbose:
          print("Converged With Parameters:")
          print(f"Iteration {i+1}: theta = {self.theta}, squared error = {squared_error(self.theta, x,y)}")
        break



  def predict(self, x):
      """Make a prediction given new inputs x.

      :param x: Inputs of shape (m, n).
      :return:  Outputs of shape (m,).
      """
      return np.dot(x, self.theta)

A few last output values are following.

Iteration 9911: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9921: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9931: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9941: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9951: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9961: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9971: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9981: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9991: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878

I have tried even decreasing the learning_rate after 10 steps but still not working. I have used the Fish_Market data from Kaggle: https://www.kaggle.com/datasets/tarunkumar1912/fish-dataset1?select=Fish_dataset.csv


Solution

  • So, I was able to get the Code to work as intended.

    • I had to vectorize the code as much as possible to make it run faster (Gradient Descent and Stochastic Gradient Descent were taking AGES to find the params of training set of shape (18000,7).
    • I was doing some wrong vector calculations in the above code specifically inside the for loop over range(m) => (columns of input training set) (For Batch Gradient Descent)

    Here's the code:

    class LinearRegression2(LinearModel):
    """Linear regression with Multiple Methods."""
    
    def fit(self, x, y):
        """Run Gradient Descent Method to minimize J(theta) for Linear Regression.
    
        :param x: Training example inputs. Shape (m, n).
        :param y: Training example labels. Shape (m,).
        """
    
        m, n = x.shape
        if self.theta is None:
            self.theta = np.zeros(n)
        
        J_arr = []  # To store cost values for plotting
        e_arr = []  # To store MSE values for plotting
        J = 0
        error = 0
    
        if self.method == 'norm':
            self.theta = np.linalg.inv(x.T @ x) @ (x.T @ y)
            J = np.mean((y - x @ self.theta) ** 2) / 2.0
            error = np.mean((x @ self.theta - y) ** 2)
    
            if self.verbose:
                print(f"J: {J}, squared error = {np.mean((y - x @ self.theta) ** 2)}, theta = {self.theta} ")
    
        if self.method == 'gdb':
            decay_factor = 0.5  # Adjust this as needed (if needed)
            decay_interval = 10  # Adjust this as needed (if needed)
    
            for i in range(self.max_iter):
                error = y - x @ self.theta
                J = np.mean((y - x @ self.theta) ** 2) / 2.0
                J_arr.append(J)
                e_arr.append(np.mean(error ** 2))
    
                gradient = -x.T @ error / m
                self.theta -= self.step_size * gradient
    
                # if i % decay_interval == 0:
                #     self.step_size *= decay_factor
    
                if self.verbose and i % decay_interval == 0:
                    print(f"J: {J}, squared error = {np.mean(error ** 2)}, theta = {self.theta} ")
    
                if J < self.eps and self.verbose:
                    print("Converged With Parameters:")
                    print(f"J: {J}, squared error = {np.mean(error ** 2)}, theta = {self.theta} ")
                    break
    
        if self.method == 'gds':
            decay_factor = 0.5  # Adjust this as needed
            decay_interval = 10  # Adjust this as needed
            flag = False
    
            for _ in range(self.max_iter):
    
              for i in range(m):
                idx = np.random.randint(0, m)
                xi = x[idx:idx+1]
                yi = y[idx:idx+1]
                error = yi - xi @ self.theta
                J = np.mean((yi - xi @ self.theta) ** 2) / 2.0
                J_arr.append(J)
                e_arr.append(np.mean(error ** 2))
    
                gradient = - (error @ xi) / m
                self.theta -= self.step_size * gradient
    
                # if i % decay_interval == 0:
                #     self.step_size *= decay_factor
    
                if self.verbose and i % decay_interval == 0:
                  print(f"J: {J}, squared error = {np.mean(error ** 2)}, theta = {self.theta} ")
    
                if J < self.eps:
                  flag=True
                  break
    
              if self.verbose and flag==True:
                  print("Converged With Parameters:")
                  print(f"J: {J}, squared error = {np.mean(error ** 2)}, theta = {self.theta} ")
              break
    
        if self.plot and self.method != 'norm':
            fig, axes = plt.subplots(1, 2, constrained_layout=True, figsize=(15, 5))
            axes[0].set_title("J(theta)")
            axes[0].set_xlabel(f"iterations")
            axes[0].set_ylabel('J(theta)')
            axes[0].plot(J_arr, color='red')
            axes[1].set_title("mse")
            axes[1].set_xlabel(f"iterations")
            axes[1].set_ylabel('mse')
            axes[1].plot(e_arr, color='g')
        elif self.plot and self.method == 'norm':
            print('')
            print('----------------------------------------------------------')
            print('Normal Equation used to find "theta", Graph Not Available.')
        else:
            pass
    
    def predict(self, x):
        """Make a prediction given new inputs x.
    
        :param x: Inputs of shape (m, n).
        :return:  Outputs of shape (m,).
        """
        return x @ self.theta