Source code for ptmelt.models

import warnings
from typing import Optional

import torch
import torch.nn as nn
from tqdm import tqdm

from ptmelt.blocks import DefaultOutput, DenseBlock, MixtureDensityOutput, ResidualBlock
from ptmelt.losses import MixtureDensityLoss


[docs] class MELTModel(nn.Module): """ PT-MELT Base model. Args: num_features (int): The number of input features. num_outputs (int): The number of output units. width (int, optional): The width of the hidden layers. Defaults to 32. depth (int, optional): The number of hidden layers. Defaults to 2. act_fun (str, optional): The activation function to use. Defaults to 'relu'. dropout (float, optional): The dropout rate. Defaults to 0.0. input_dropout (float, optional): The input dropout rate. Defaults to 0.0. batch_norm (bool, optional): Whether to use batch normalization. Defaults to False. batch_norm_type (str, optional): The type of batch normalization to use. Defaults to 'ema'. use_batch_renorm (bool, optional): Whether to use batch renormalization. Defaults to False. output_activation (str, optional): The activation function for the output layer. Defaults to None. initializer (str, optional): The weight initializer to use. Defaults to 'glorot_uniform'. l1_reg (float, optional): The L1 regularization strength. Defaults to 0.0. l2_reg (float, optional): The L2 regularization strength. Defaults to 0.0. num_mixtures (int, optional): The number of mixture components for MDN. Defaults to 0. node_list (list, optional): The list of nodes per layer to alternately define layers. Defaults to None. **kwargs: Additional keyword arguments. """ def __init__( self, num_features: int, num_outputs: int, width: Optional[int] = 32, depth: Optional[int] = 2, act_fun: Optional[str] = "relu", dropout: Optional[float] = 0.0, input_dropout: Optional[float] = 0.0, batch_norm: Optional[bool] = False, batch_norm_type: Optional[str] = "ema", use_batch_renorm: Optional[bool] = False, output_activation: Optional[str] = None, initializer: Optional[str] = "glorot_uniform", l1_reg: Optional[float] = 0.0, l2_reg: Optional[float] = 0.0, num_mixtures: Optional[int] = 0, node_list: Optional[list] = None, **kwargs, ): super(MELTModel, self).__init__(**kwargs) self.num_features = num_features self.num_outputs = num_outputs self.width = width self.depth = depth self.act_fun = act_fun self.dropout = dropout self.input_dropout = input_dropout self.batch_norm = batch_norm self.batch_norm_type = batch_norm_type self.use_batch_renorm = use_batch_renorm self.output_activation = output_activation self.initializer = initializer self.l1_reg = l1_reg self.l2_reg = l2_reg self.num_mixtures = num_mixtures self.node_list = node_list # Determine if network should be defined based on depth/width or node_list if self.node_list: self.num_layers = len(self.node_list) self.layer_width = self.node_list else: self.num_layers = self.depth self.layer_width = [self.width for i in range(self.depth)] # Create list for storing names of sub-layers self.sub_layer_names = [] # Create layer dictionary self.layer_dict = nn.ModuleDict()
[docs] def build(self): """Build the model.""" self.initialize_layers()
[docs] def initialize_layers(self): """Initialize the layers of the model.""" self.create_dropout_layers() self.create_output_layer()
[docs] def create_dropout_layers(self): """Create the dropout layers.""" if self.input_dropout > 0: self.layer_dict.update({"input_dropout": nn.Dropout(p=self.input_dropout)})
[docs] def create_output_layer(self): """Create the output layer.""" if self.num_mixtures > 0: self.layer_dict.update( { "output": MixtureDensityOutput( input_features=self.layer_width[-1], num_mixtures=self.num_mixtures, num_outputs=self.num_outputs, activation=self.output_activation, initializer=self.initializer, ) } ) self.sub_layer_names.append("output") else: self.layer_dict.update( { "output": DefaultOutput( input_features=self.layer_width[-1], output_features=self.num_outputs, activation=self.output_activation, initializer=self.initializer, ) } ) self.sub_layer_names.append("output")
[docs] def compute_jacobian(self, x): """Compute the Jacobian of the model with respect to the input.""" pass
[docs] def l1_regularization(self, lambda_l1: float): """ Compute the L1 regularization term for use in the loss function. Args: lambda_l1 (float): The L1 regularization strength. """ l1_norm = sum( p.abs().sum() for name, p in self.named_parameters() if p.requires_grad and "weight" in name ) return lambda_l1 * l1_norm
[docs] def l2_regularization(self, lambda_l2: float): """ Compute the L2 regularization term for use in the loss function. Args: lambda_l2 (float): The L2 regularization strength. """ l2_norm = sum( p.pow(2.0).sum() for name, p in self.named_parameters() if p.requires_grad and "weight" in name ) return 0.5 * lambda_l2 * l2_norm
[docs] def get_loss_fn( self, loss: Optional[str] = "mse", reduction: Optional[str] = "mean" ): """ Get the loss function for the model. Used in the training loop. Args: loss (str, optional): The loss function to use. Defaults to 'mse'. reduction (str, optional): The reduction method for the loss. Defaults to 'mean'. """ if self.num_mixtures > 0: warnings.warn( "Mixture Density Networks require the use of the MixtureDensityLoss " "class. The loss function will be set to automatically." ) return MixtureDensityLoss( num_mixtures=self.num_mixtures, num_outputs=self.num_outputs ) elif loss == "mse": return nn.MSELoss(reduction=reduction) else: raise ValueError(f"Loss function {loss} not recognized.")
[docs] def fit(self, train_dl, val_dl, optimizer, criterion, num_epochs: int): """ Perform the model training loop. Args: train_dl (DataLoader): The training data loader. val_dl (DataLoader): The validation data loader. optimizer (Optimizer): The optimizer to use. criterion (Loss): The loss function to use. num_epochs (int): The number of epochs to train the model. """ # Create history dictionary if not hasattr(self, "history"): self.history = {"loss": [], "val_loss": []} for epoch in tqdm(range(num_epochs)): # Put model in training mode self.train() running_loss = 0.0 for x_in, y_in in train_dl: # Forward pass pred = self(x_in) loss = criterion(pred, y_in) # Add L1 and L2 regularization if present if self.l1_reg > 0: loss += self.l1_regularization(lambda_l1=self.l1_reg) if self.l2_reg > 0: loss += self.l2_regularization(lambda_l2=self.l2_reg) # Zero the parameter gradients optimizer.zero_grad() # Backward pass loss.backward() # Optimize optimizer.step() # Print statistics running_loss += loss.item() # Normalize loss running_loss /= len(train_dl) # Put model in evaluation mode self.eval() # Compute validation loss running_val_loss = 0.0 with torch.no_grad(): for x_val, y_val in val_dl: pred_val = self(x_val) val_loss = criterion(pred_val, y_val) running_val_loss += val_loss.item() running_val_loss /= len(val_dl) # Print statistics if (epoch + 1) % 10 == 0: print( f"Epoch {epoch + 1}, Loss: {running_loss:.4f}, " f"Val Loss: {running_val_loss:.4f}" ) # Save history self.history["loss"].append(running_loss) self.history["val_loss"].append(running_val_loss)
[docs] class ArtificialNeuralNetwork(MELTModel): """ Artificial Neural Network (ANN) model. Args: **kwargs: Additional keyword arguments. """ def __init__( self, **kwargs, ): super(ArtificialNeuralNetwork, self).__init__(**kwargs)
[docs] def initialize_layers(self): """Initialize the layers of the ANN.""" super(ArtificialNeuralNetwork, self).initialize_layers() # Bulk layers self.layer_dict.update( { "dense_block": DenseBlock( input_features=self.num_features, node_list=self.layer_width, activation=self.act_fun, dropout=self.dropout, batch_norm=self.batch_norm, batch_norm_type=self.batch_norm_type, use_batch_renorm=self.use_batch_renorm, initializer=self.initializer, ) } ) self.sub_layer_names.append("dense_block")
[docs] def forward(self, inputs: torch.Tensor): """ Perform the forward pass of the ANN. Args: inputs (torch.Tensor): The input data. """ # Apply input dropout x = ( self.layer_dict["input_dropout"](inputs) if self.input_dropout > 0 else inputs ) # Apply dense block x = self.layer_dict["dense_block"](x) # Apply the output layer(s) and return return self.layer_dict["output"](x)
[docs] class ResidualNeuralNetwork(MELTModel): """ Residual Neural Network (ResNet) model. Args: layers_per_block (int, optional): The number of layers per residual block. Defaults to 2. pre_activation (bool, optional): Whether to use pre-activation. Defaults to True. post_add_activation (bool, optional): Whether to use activation after addition. Defaults to False. **kwargs: Additional keyword arguments. """ def __init__( self, layers_per_block: Optional[int] = 2, pre_activation: Optional[bool] = True, post_add_activation: Optional[bool] = False, **kwargs, ): super(ResidualNeuralNetwork, self).__init__(**kwargs) self.layers_per_block = layers_per_block self.pre_activation = pre_activation self.post_add_activation = post_add_activation
[docs] def build(self): """Build the model.""" if self.depth % self.layers_per_block != 0: warnings.warn( f"Warning: depth {self.num_layers} is not divisible by " f"layers_per_block ({self.layers_per_block}), so the last block will " f"have {self.depth % self.layers_per_block} layers." ) self.initialize_layers() super(ResidualNeuralNetwork, self).build()
[docs] def initialize_layers(self): """Initialize the layers of the ResNet.""" super(ResidualNeuralNetwork, self).initialize_layers() # Create the Residual Block self.layer_dict.update( { "residual_block": ResidualBlock( layers_per_block=self.layers_per_block, pre_activation=self.pre_activation, post_add_activation=self.post_add_activation, input_features=self.num_features, node_list=self.layer_width, activation=self.act_fun, dropout=self.dropout, batch_norm=self.batch_norm, batch_norm_type=self.batch_norm_type, use_batch_renorm=self.use_batch_renorm, initializer=self.initializer, ) } ) self.sub_layer_names.append("residual_block")
[docs] def forward(self, inputs: torch.Tensor): """ Perform the forward pass of the ResNet. Args: inputs (torch.Tensor): The input data. """ # Apply input dropout x = ( self.layer_dict["input_dropout"](inputs) if self.input_dropout > 0 else inputs ) # Apply residual block x = self.layer_dict["residual_block"](x) # Apply the output layer(s) and return return self.layer_dict["output"](x)