Search code examples
pytorchpredictmlflow

How to make a mlflow model predict?


I recently made a GNN model using TransformerConv and TopKPooling, it is smooth while training, but I have problems when I want to use it to predict, it kept telling me that the TransformerConv doesn't have the 'aggr_module' attribute

This is my network:

class GNN(torch.nn.Module):
    def __init__(self, feature_size, model_params):
        super(GNN, self).__init__()
        embedding_size = model_params["model_embedding_size"]
        n_heads = model_params["model_attention_heads"]
        self.n_layers = model_params["model_layers"]
        dropout_rate = model_params["model_dropout_rate"]
        top_k_ratio = model_params["model_top_k_ratio"]
        self.top_k_every_n = model_params["model_top_k_every_n"]
        dense_neurons = model_params["model_dense_neurons"]

        self.conv_layers = ModuleList([])
        self.transf_layers = ModuleList([])
        self.pooling_layers = ModuleList([])
        self.bn_layers = ModuleList([])

        # Transformation layer: transform original node features to embedding vector(size: embedding_size(defined in config.py))
        self.conv1 = TransformerConv(feature_size, 
                                    embedding_size, 
                                    heads=n_heads, 
                                    dropout=dropout_rate,
                                    #edge_dim=edge_dim,
                                    beta=True) 

        self.transf1 = Linear(embedding_size*n_heads, embedding_size)
        self.bn1 = BatchNorm1d(embedding_size)

        # Other layers: message passing and pooling
        for i in range(self.n_layers):
            self.conv_layers.append(TransformerConv(embedding_size, 
                                                    embedding_size, 
                                                    heads=n_heads, 
                                                    dropout=dropout_rate,
                                                    #edge_dim=edge_dim,
                                                    beta=True))

            # map conv_layer output size back to emgedding_size(embedding_size*n_heads -> embedding_size)
            self.transf_layers.append(Linear(embedding_size*n_heads, embedding_size))
            # Batch normalization
            self.bn_layers.append(BatchNorm1d(embedding_size))
            # Top-k pooling to reduce the size of the graph
            if i % self.top_k_every_n == 0:
                self.pooling_layers.append(TopKPooling(embedding_size, ratio=top_k_ratio))
            

        # Linear output layers: feed graph representation in & reduce until single value left
        self.linear1 = Linear(embedding_size*2, dense_neurons)
        self.linear2 = Linear(dense_neurons, int(dense_neurons/2))  
        self.linear3 = Linear(int(dense_neurons/2), 3) # same as the general form

    def forward(self, x, edge_index, batch_index):
        # Initial transformation
        x = self.conv1(x, edge_index)
        x = torch.relu(self.transf1(x))
        x = torch.relu((x))

        x = self.bn1(x)

        # Holds the intermediate graph representations
        global_representation = []

        for i in range(self.n_layers):
            x = self.conv_layers[i](x, edge_index)
            x = torch.relu(self.transf_layers[i](x))
            x = torch.relu((x))
            x = self.bn_layers[i](x)
            # Always aggregate last layer
            if i % self.top_k_every_n == 0 or i == self.n_layers:
                x , edge_index, edge_attr, batch_index, _, _ = self.pooling_layers[int(i/self.top_k_every_n)](  x, 
                                                                                                                edge_index, 
                                                                                                                None, 
                                                                                                                batch_index)              
                # Add current representation
                global_representation.append(torch.cat([gmp(x, batch_index), gap(x, batch_index)], dim=1))
    
        x = sum(global_representation)

        # Output block
        x = torch.relu(self.linear1(x))
        x = F.dropout(x, p=0.8, training=self.training)
        x = torch.relu(self.linear2(x))
        x = F.dropout(x, p=0.8, training=self.training)
        x = self.linear3(x)

        return x

One training:

def run_one_training(params):
    params = params[0]
    with mlflow.start_run() as run:
        # Log parameters used in this experiment
        for key in params.keys():
            mlflow.log_param(key, params[key])

        # Loading the dataset
        print("Loading dataset...")
        full_dataset = ProcessedDataset(root = "data/", filename = "fixtures_full.csv")
        full_dataset.shuffle()
        train_dataset = full_dataset[:3400] # around 80% of the full dataset
        test_dataset = full_dataset[3400:3800]
        
        # Prepare training
        print("Preparing Training")
        batch_size=params["batch_size"]
        train_loader = DataLoader(train_dataset, batch_size=batch_size)
        test_loader = DataLoader(test_dataset, batch_size=batch_size)   
        
        # Loading the model
        print("Loading model...")
        model_params = {k: v for k, v in params.items() if k.startswith("model_")}
        model = GNN(feature_size=train_dataset[0].x.shape[1], model_params=model_params)

        print(model)
        model = model.to(device)
        print(f"Number of parameters: {count_parameters(model)}")
        mlflow.log_param("num_params", count_parameters(model))

        class_weights = [1.0239, 1.2753, 0.8070]
        class_weights= torch.tensor(class_weights,dtype=torch.float)
        
        loss_fn = torch.nn.CrossEntropyLoss()
        optimizer = torch.optim.SGD(model.parameters(), 
                                    lr=params["learning_rate"],
                                    momentum=params["sgd_momentum"],
                                    weight_decay=params["weight_decay"])
        
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=params["scheduler_gamma"])
        
        # Start training
        best_loss = 1000
        early_stopping_counter = 0
        for epoch in range(300): 
            if early_stopping_counter <= 10: # = x * 5 
                # Training
                model.train()
                loss = train_one_epoch(epoch, model, train_loader, optimizer, loss_fn)
                print(f"Epoch {epoch} | Train Loss {loss}")
                mlflow.log_metric(key="Train loss", value=float(loss), step=epoch)

                # Testing
                model.eval()
                if epoch % 5 == 0:
                    loss = test(epoch, model, test_loader, loss_fn)
                    print(f"Epoch {epoch} | Test Loss {loss}")
                    mlflow.log_metric(key="Test loss", value=float(loss), step=epoch)
                    
                    # Update best loss
                    if float(loss) < best_loss:
                        best_loss = loss
                        # Save the currently best model 
                        mlflow.pytorch.log_model(model, "model", signature=SIGNATURE)
                        early_stopping_counter = 0
                    else:
                        early_stopping_counter += 1

                scheduler.step()
            else:
                print("Early stopping due to no improvement.")
                return [best_loss]
    print(f"Finishing training with best test loss: {best_loss}")
    return [best_loss]

Train and Test

def train_one_epoch(epoch, model, train_loader, optimizer, loss_fn):
    # Enumerate over the data
    all_preds = []
    all_labels = []
    running_loss = 0.0
    step = 0
    for _, batch in enumerate(tqdm(train_loader)):
        batch.x = torch.tensor(batch.x)
        batch.x = batch.x.reshape((-1, *batch.x.shape[2:]))
        # Use GPU
        batch.to(device)  
        # Reset gradients
        optimizer.zero_grad() 
        # Passing the node features and the connection info
        pred = model(torch.tensor(batch.x).float(), 
                                #batch.edge_attr.float(),
                                batch.edge_index, 
                                batch.batch) 
        # Calculating the loss and gradients
        loss = torch.sqrt(loss_fn(pred, batch.y.long()))

        loss.backward() 
        optimizer.step()
        
        # Update tracking
        running_loss += loss.item()
        step += 1
        
        all_preds.append(np.argmax(pred.cpu().detach().numpy(), axis=1))
        
        all_labels.append(batch.y.cpu().detach().numpy())
    all_preds = np.concatenate(all_preds).ravel()
    all_labels = np.concatenate(all_labels).ravel()
    calculate_metrics(all_preds, all_labels, epoch, "train")
    return running_loss/step

def test(epoch, model, test_loader, loss_fn):
    all_preds = []
    all_preds_raw = []
    all_labels = []
    running_loss = 0.0
    step = 0
    for batch in test_loader:
        batch.x = torch.tensor(batch.x)
        batch.x = batch.x.reshape((-1, *batch.x.shape[2:]))
        batch.to(device)  
        pred = model(torch.tensor(batch.x).float(), 
                        #batch.edge_attr.float(),
                        batch.edge_index, 
                        batch.batch) 
        loss = torch.sqrt(loss_fn(pred, batch.y.long()))
         # Update tracking
        running_loss += loss.item()
        step += 1
                
        all_preds.append(np.argmax(pred.cpu().detach().numpy(), axis=1))
        all_preds_raw.append(torch.sigmoid(pred).cpu().detach().numpy())
        all_labels.append(batch.y.cpu().detach().numpy())
    
    all_preds = np.concatenate(all_preds).ravel()
    all_labels = np.concatenate(all_labels).ravel()
    print(all_preds_raw[0][:10])
    print(all_preds[:10])
    print(all_labels[:10])
    calculate_metrics(all_preds, all_labels, epoch, "test")
    log_conf_matrix(all_preds, all_labels, epoch)
    return running_loss/step

Predict:

def predict(model, test_loader):
    all_preds = []
    all_preds_raw = []
    all_labels = []

    for batch in test_loader:
        batch.x = torch.tensor(batch.x)
        batch.x = batch.x.reshape((-1, *batch.x.shape[2:]))
        batch.to(device)  
        pred = model(torch.tensor(batch.x).float(), 
                        #batch.edge_attr.float(),
                        batch.edge_index, 
                        batch.batch) 

        all_preds.append(np.argmax(pred.cpu().detach().numpy(), axis=1))
        all_preds_raw.append(torch.sigmoid(pred).cpu().detach().numpy())
        all_labels.append(batch.y.cpu().detach().numpy())
    
    all_preds = np.concatenate(all_preds).ravel()
    all_labels = np.concatenate(all_labels).ravel()
    return all_preds, all_preds_raw, all_labels

I was using mlflow to load my model and this is what I did:

import mlflow
logged_model = 'runs:/b18929aa871047f9892aa3c84a998d28/model'

# Load model
loaded_model = mlflow.pytorch.load_model(logged_model)

loaded_model = loaded_model.to(device)
loaded_model.eval()

loader = DataLoader(dataset, batch_size=len(dataset))

all_pred, all_pred_raw, all_label = predict(loaded_model, loader)

This is the error message

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Input In [23], in <cell line: 7>()
      3 dataset = full_dataset[3800:]
      5 loader = DataLoader(dataset, batch_size=len(dataset))
----> 7 all_pred, all_pred_raw, all_label = predict(loaded_model, loader)

Input In [20], in predict(epoch, model, test_loader, loss_fn)
    143 batch.x = batch.x.reshape((-1, *batch.x.shape[2:]))
    144 batch.to(device)  
--> 145 pred = model(torch.tensor(batch.x).float(), 
    146                 #batch.edge_attr.float(),
    147                 batch.edge_index, 
    148                 batch.batch) 
    150 all_preds.append(np.argmax(pred.cpu().detach().numpy(), axis=1))
    151 all_preds_raw.append(torch.sigmoid(pred).cpu().detach().numpy())

File ~\anaconda3\lib\site-packages\torch\nn\modules\module.py:1190, in Module._call_impl(self, *input, **kwargs)
   1186 # If we don't have any hooks, we want to skip the rest of the logic in
   1187 # this function, and just call forward.
   1188 if not (self._backward_hooks or self._forward_hooks or self._forward_pre_hooks or _global_backward_hooks
   1189         or _global_forward_hooks or _global_forward_pre_hooks):
-> 1190     return forward_call(*input, **kwargs)
   1191 # Do not call functions when jit is used
   1192 full_backward_hooks, non_full_backward_hooks = [], []

File ~\FIFA_PROJECT\model.py:67, in GNN.forward(self, x, edge_index, batch_index)
     63 def forward(self, x, edge_index, batch_index):
     64 #def forward(self, x, edge_attr=None, edge_index, batch_index):
     65     # Initial transformation
     66     #x = self.conv1(x, edge_index, edge_attr)
---> 67     x = self.conv1(x, edge_index)
     68     x = torch.relu(self.transf1(x))
     69     x = torch.relu((x))

File ~\anaconda3\lib\site-packages\torch\nn\modules\module.py:1190, in Module._call_impl(self, *input, **kwargs)
   1186 # If we don't have any hooks, we want to skip the rest of the logic in
   1187 # this function, and just call forward.
   1188 if not (self._backward_hooks or self._forward_hooks or self._forward_pre_hooks or _global_backward_hooks
   1189         or _global_forward_hooks or _global_forward_pre_hooks):
-> 1190     return forward_call(*input, **kwargs)
   1191 # Do not call functions when jit is used
   1192 full_backward_hooks, non_full_backward_hooks = [], []

File ~\anaconda3\lib\site-packages\torch_geometric\nn\conv\gcn_conv.py:198, in GCNConv.forward(self, x, edge_index, edge_weight)
    195 x = self.lin(x)
    197 # propagate_type: (x: Tensor, edge_weight: OptTensor)
--> 198 out = self.propagate(edge_index, x=x, edge_weight=edge_weight,
    199                      size=None)
    201 if self.bias is not None:
    202     out = out + self.bias

File ~\anaconda3\lib\site-packages\torch_geometric\nn\conv\message_passing.py:454, in MessagePassing.propagate(self, edge_index, size, **kwargs)
    451     if res is not None:
    452         aggr_kwargs = res[0] if isinstance(res, tuple) else res
--> 454 out = self.aggregate(out, **aggr_kwargs)
    456 for hook in self._aggregate_forward_hooks.values():
    457     res = hook(self, (aggr_kwargs, ), out)

File ~\anaconda3\lib\site-packages\torch_geometric\nn\conv\message_passing.py:578, in MessagePassing.aggregate(self, inputs, index, ptr, dim_size)
    565 def aggregate(self, inputs: Tensor, index: Tensor,
    566               ptr: Optional[Tensor] = None,
    567               dim_size: Optional[int] = None) -> Tensor:
    568     r"""Aggregates messages from neighbors as
    569     :math:`\square_{j \in \mathcal{N}(i)}`.
    570 
   (...)
    576     as specified in :meth:`__init__` by the :obj:`aggr` argument.
    577     """
--> 578     return self.aggr_module(inputs, index, ptr=ptr, dim_size=dim_size,
    579                             dim=self.node_dim)

File ~\anaconda3\lib\site-packages\torch\nn\modules\module.py:1265, in Module.__getattr__(self, name)
   1263     if name in modules:
   1264         return modules[name]
-> 1265 raise AttributeError("'{}' object has no attribute '{}'".format(
   1266     type(self).__name__, name))

AttributeError: 'TransformerConv' object has no attribute 'aggr_module'

Please I'm begging :(

I wrote the predict function but it didn't come out as expected. Pls send help, would be grateful for any suggestions.


Solution

  • I’ve gotten the solution from pyg discussion here

    So basically you can get around this by iterating over all `MessagePassing layers and setting:

    loaded_model = mlflow.pytorch.load_model(logged_model)
    for conv in loaded_model.conv_layers:
        conv.aggr_module = SumAggregation()
    

    This should fix the problem!