Search code examples
pythonpytorchneural-networkregressionmlp

Tuning Pytorch MLP to perform parameter estimation instead of typical nonlinear regression


Let's say I have a function:

Y = 1-V Cos(k*X + phi),

And Y additionally has some noise (let's say Gaussian noise), which might look like the following figure:

enter image description here

I want to come up with an estimate for V, k, and phi. Normal nonlinear regression can do it. But I find it to be more of an art than science to get it to work well. I'd like to instead explore if some of the powerful AI tools can more consistently estimate these without a lot of work.

I have written some Pytorch code that attempts to estimate the parameters. It is an MPL that is fed in 100 input X values and 100 input Y values to have 200 total inputs to the MLP. Then the output to the MLP returns outputs (V, k, Phi). For the dataset for training, I randomly generate values of V, k, and phi, and then generate a noisy 100 point dataset for Y = 1-V(cos(k X + phi)).

After a few attempts playing around with the number of hidden layers, I managed to get some code that made fairly reasonable guessses, but still was not particularly good.

So I decided to try to use Optuna to look for the best hyperparmeters. I ran it overnight and it told me to increase the number of hidden layer nodes significantly.

But when I actually trained using this, I get a terrible result:

enter image description here

I'm not sure why it ended up being so much worse. Is this an overfitting issue? What am I doing wrong?

Here is the pytorch code to do the oridinary MLP that I describe:

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt



# Parameters for generating the data
n_samples = 200
n_sets = 1000  # number of different sets of parameters

# Generating x values 
x_values = np.linspace(-np.pi, np.pi, n_samples)
x_data = np.tile(x_values, (n_sets, 1)) # Repeat x_values for each set of parameters

V_values = np.random.uniform(0.1, 0.5, n_sets)
phi_values = np.random.uniform(-np.pi, np.pi, n_sets)
w_values = np.random.uniform(1, 5, n_sets)

# Generating y data based on the parameters 
y_data = []
targets = []
for V, phi, w in zip(V_values, phi_values, w_values):
    y = 1 - V * np.cos(w * x_values + phi)
    noise = np.random.normal(0, 0.1, y.shape)  
    y += noise
    y_data.append(y)
    targets.append([V, phi, w])


# Convert arrays to the tensor format
input_data_tensor = torch.tensor(np.concatenate((x_data, y_data), axis=1), dtype=torch.float32)
params_tensor = torch.tensor(targets, dtype=torch.float32)

# These values were obtained by running optima overnight:
layer1_units = 1556
layer2_units = 1520
lr = 0.06120801703537274
weight_decay = 0.008892619617512433

class ParamNet(nn.Module):
    def __init__(self):
        super(ParamNet, self).__init__()
        self.layer1 = nn.Linear(2 * n_samples, layer1_units)
        self.activ1 = nn.LeakyReLU()
        self.layer2 = nn.Linear(layer1_units, layer2_units)  
        self.activ2 = nn.LeakyReLU()
        self.param_layer = nn.Linear(layer2_units, 3)  # Parameter output layer

    def forward(self, x):
        x = self.activ1(self.layer1(x))
        x = self.activ2(self.layer2(x))
        params = self.param_layer(x)
        return params

model = ParamNet()

# Defining loss function and optimizer
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
loss_fn = nn.MSELoss()

# Training loop
epochs = 5000
for epoch in range(epochs):
    params = model(input_data_tensor)
    loss = loss_fn(params, params_tensor)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    if (epoch + 1) % 500 == 0:
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}")

# Predicting parameters
predicted = model(input_data_tensor).detach().numpy()

# Evaluating model performance
mse = np.mean((predicted - targets)**2)
print("MSE: ", mse)

V_true = np.random.uniform(0.1, 0.5)
phi_true = np.random.uniform(-np.pi, np.pi)
w_true = np.random.uniform(1, 5)
y_new = 1 - V_true * np.cos(w_true * x_values + phi_true)
noise = np.random.normal(0, 0.1, y_new.shape)  # you can vary the amplitude of the noise
y_new += noise

# Prepare the tensor for model
x_values_tensor = torch.tensor(x_values, dtype=torch.float32).repeat(1, 1)  # repeat x_values for each set
y_new_tensor = torch.tensor(y_new, dtype=torch.float32).unsqueeze(0)  # unsqueeze to add one more dimension for batch
input_new_tensor = torch.cat((x_values_tensor, y_new_tensor), dim=-1)  # concatenate along the last dimension

# Pass the new data through the trained model
predicted_params = model(input_new_tensor).detach().numpy()[0]
V_pred, phi_pred, w_pred = predicted_params
y_pred = 1 - V_pred * np.cos(w_pred * x_values + phi_pred)

# Plot the original and predicted functions
plt.figure(figsize=(10, 6))
plt.plot(x_values, y_new, label='Original function')
plt.plot(x_values, y_pred, label='Predicted function', linestyle='--')
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
plt.show()

And here is the Optuna hyperparameter estimation attempt:

import torch
import torch.nn as nn
import numpy as np
import optuna
from skorch import NeuralNetRegressor
import torch.optim as optim

# Parameters for generating the data
n_samples = 200
n_sets = 1000  # number of different sets of parameters

# Generating x values 
x_values = np.linspace(-np.pi, np.pi, n_samples)
x_data = np.tile(x_values, (n_sets, 1)) 

V_values = np.random.uniform(0.1, 0.5, n_sets)
phi_values = np.random.uniform(-np.pi, np.pi, n_sets)
w_values = np.random.uniform(1, 5, n_sets)

# Generating y data based on the parameters 
y_data = []
targets = []
for V, phi, w in zip(V_values, phi_values, w_values):
    y = 1 - V * np.cos(w * x_values + phi)
    noise = np.random.normal(0, 0.1, y.shape)  
    y += noise
    y_data.append(y)
    targets.append([V, phi, w])

# Convert arrays to the tensor format
input_data_tensor = torch.tensor(np.concatenate((x_data, y_data), axis=1), dtype=torch.float32)
params_tensor = torch.tensor(targets, dtype=torch.float32)

class ParamNet(nn.Module):
    def __init__(self, layer1_units=600, layer2_units=600):
        super(ParamNet, self).__init__()
        self.layer1 = nn.Linear(2 * n_samples, layer1_units)
        self.activ1 = nn.LeakyReLU()
        self.layer2 = nn.Linear(layer1_units, layer2_units)  
        self.activ2 = nn.LeakyReLU()
        self.param_layer = nn.Linear(layer2_units, 3)  # Parameter output layer

    def forward(self, x):
        x = self.activ1(self.layer1(x))
        x = self.activ2(self.layer2(x))
        params = self.param_layer(x)
        return params

def objective(trial):
    # Define the hyperparameters
    layer1_units = trial.suggest_int('layer1_units', 500, 2000)
    layer2_units = trial.suggest_int('layer2_units', 500, 2000)
    lr = trial.suggest_loguniform('lr', 0.01, 0.1)
    weight_decay = trial.suggest_loguniform('weight_decay', 1e-5, 0.01)
    
    model = ParamNet(layer1_units, layer2_units)

    # Training process
    net = NeuralNetRegressor(
        model,
        max_epochs=2000,
        lr=lr,
        optimizer=torch.optim.Adam,
        optimizer__weight_decay=weight_decay,
        device='cuda' if torch.cuda.is_available() else 'cpu'
    )
    
    net.initialize() 
    net.fit(input_data_tensor, params_tensor)
    
    # Return the final loss
    loss = net.history[-1, 'train_loss']
    return loss

study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=50)

print("Best trial:")
trial = study.best_trial

print(trial.params)

EDIT: Here's an attempt to break down the input data into different mini-batches, as suggested by user @Klops:

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader


# Parameters for generating the data
n_samples = 200
n_sets = 1000  # number of different sets of parameters

# Generating x values 
x_values = np.linspace(-np.pi, np.pi, n_samples)
x_data = np.tile(x_values, (n_sets, 1)) # Repeat x_values for each set of parameters

V_values = np.random.uniform(0.1, 0.5, n_sets)
phi_values = np.random.uniform(-np.pi, np.pi, n_sets)
w_values = np.random.uniform(1, 5, n_sets)





# Generating y data based on the parameters 
y_data = []
targets = []
for V, phi, w in zip(V_values, phi_values, w_values):
    y = 1 - V * np.cos(w * x_values + phi)
    noise = np.random.normal(0, 0.1, y.shape)  
    y += noise
    y_data.append(y)
    targets.append([V, phi, w])


# Convert arrays to the tensor format
input_data_tensor = torch.tensor(np.concatenate((x_data, y_data), axis=1), dtype=torch.float32)
params_tensor = torch.tensor(targets, dtype=torch.float32)

# Convert arrays to tensors
dataset = torch.utils.data.TensorDataset(input_data_tensor, params_tensor)
# Define dataloader with a specific batch size (let's set it to 32)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# These values were obtained by running optima overnight:
layer1_units = 200
layer2_units = 300
lr = 0.06120801703537274
weight_decay = 0.008892619617512433

class ParamNet(nn.Module):
    def __init__(self):
        super(ParamNet, self).__init__()
        self.layer1 = nn.Linear(2 * n_samples, layer1_units)
        self.activ1 = nn.LeakyReLU()
        self.layer2 = nn.Linear(layer1_units, layer2_units)  
        self.activ2 = nn.LeakyReLU()
        self.param_layer = nn.Linear(layer2_units, 3)  # Parameter output layer

    def forward(self, x):
        x = self.activ1(self.layer1(x))
        x = self.activ2(self.layer2(x))
        params = self.param_layer(x)
        return params

model = ParamNet()

# Defining loss function and optimizer
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
loss_fn = nn.MSELoss()

# Training loop
epochs = 3000
for epoch in range(epochs):
    for batch_idx, (data, target) in enumerate(dataloader):
        optimizer.zero_grad()
        params = model(data)
        loss = loss_fn(params, target)
        loss.backward()
        optimizer.step()
    if (epoch + 1) % 500 == 0:
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}")

# Predicting parameters
predicted = model(input_data_tensor).cpu().detach().numpy()
# predicted = model(input_data_tensor).detach().numpy()

# Evaluating model performance
mse = np.mean((predicted - targets)**2)
print("MSE: ", mse)

V_true = np.random.uniform(0.1, 0.5)
phi_true = np.random.uniform(-np.pi, np.pi)
w_true = np.random.uniform(1, 5)
y_new = 1 - V_true * np.cos(w_true * x_values + phi_true)
noise = np.random.normal(0, 0.1, y_new.shape)  # you can vary the amplitude of the noise
y_new += noise

# Prepare the tensor for model
x_values_tensor = torch.tensor(x_values, dtype=torch.float32).repeat(1, 1)  # repeat x_values for each set
y_new_tensor = torch.tensor(y_new, dtype=torch.float32).unsqueeze(0)  # unsqueeze to add one more dimension for batch
input_new_tensor = torch.cat((x_values_tensor, y_new_tensor), dim=-1)  # concatenate along the last dimension

# Pass the new data through the trained model

predicted_params = model(input_new_tensor).detach().numpy()[0]
V_pred, phi_pred, w_pred = predicted_params
y_pred = 1 - V_pred * np.cos(w_pred * x_values + phi_pred)

# Plot the original and predicted functions
plt.figure(figsize=(10, 6))
plt.plot(x_values, y_new, label='Original function')
plt.plot(x_values, y_pred, label='Predicted function', linestyle='--')
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
plt.show()

Solution

  • The main reason for your model to fail is the very large learning rate. Simply by reducing the lr i got the model to work

    lr = 0.001

    How did I find out? I saw that there was no loss convergence and I was wondering why the model was becoming better and worse in alternating patterns... the reason was the learning rate :-)

    result:

    enter image description here