I'm a beginner of seq2seq with pytorch and I want to create the model that have text as input and output is pattern of numeric.
For example, I have 'en_ids' is input that had already transform to word index and 'NPY_DATA' is pattern output for this input train_data[0]
{'NPY_DATA': tensor([2.0000, 0.5201, 0.3295, ..., 0.0000, 0.0000, 3.0000]),
'en_ids': tensor([ 2, 102, 0, 10, 0, 4, 0, 3]),
'SENTENCE': 'i call it painting the wall',
'en_tokens': ['<sos>', 'i', 'call', 'it', 'painting', 'the', 'wall', '<eos>']}
I define encoder architecture like this that is the general encoder architecture.
class Encoder(nn.Module):
def __init__(self, input_dim, embedding_dim, hidden_dim, n_layers, dropout):
super().__init__()
self.hidden_dim = hidden_dim
self.n_layers = n_layers
self.embedding = nn.Embedding(input_dim, embedding_dim)
self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
self.dropout = nn.Dropout(dropout)
def forward(self, src):
# src = [src length, batch size]
embedded = self.dropout(self.embedding(src))
# embedded = [src length, batch size, embedding dim]
outputs, (hidden, cell) = self.rnn(embedded)
# outputs = [src length, batch size, hidden dim * n directions]
# hidden = [n layers * n directions, batch size, hidden dim]
# cell = [n layers * n directions, batch size, hidden dim]
# outputs are always from the top hidden layer
return hidden, cell
Following Decoder, I am not sure I should use embedding layer ? because my output is number that is not like index of word.
class Decoder(nn.Module):
def __init__(self, output_dim, embedding_dim, hidden_dim, n_layers, dropout):
super().__init__()
self.output_dim = output_dim
self.hidden_dim = hidden_dim
self.n_layers = n_layers
self.embedding = nn.Embedding(output_dim, embedding_dim)
self.rnn = nn.LSTM(embedding_dim, hidden_dim, n_layers, dropout=dropout)
self.fc_out = nn.Linear(hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, input, hidden, cell):
# input = [batch size]
# hidden = [n layers * n directions, batch size, hidden dim]
# cell = [n layers * n directions, batch size, hidden dim]
# n directions in the decoder will both always be 1, therefore:
# hidden = [n layers, batch size, hidden dim]
# context = [n layers, batch size, hidden dim]
input = input.unsqueeze(0)
# input = [1, batch size]
# embedded = self.dropout(self.embedding(input))
#embedded = [1, batch size, embedding dim]
output, (hidden, cell) = self.rnn(input)
# output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
# output = [seq length, batch size, hidden dim * n directions]
# hidden = [n layers * n directions, batch size, hidden dim]
# cell = [n layers * n directions, batch size, hidden dim]
# seq length and n directions will always be 1 in this decoder, therefore:
# output = [1, batch size, hidden dim]
# hidden = [n layers, batch size, hidden dim]
# cell = [n layers, batch size, hidden dim]
prediction = self.fc_out(output.squeeze(0))
# prediction = [batch size, output dim]
return prediction, hidden, cell
My final Seq2Seq model:
Seq2Seq(
(encoder): Encoder(
(embedding): Embedding(172, 300)
(rnn): LSTM(300, 1024, num_layers=2, dropout=0.5)
(dropout): Dropout(p=0.5, inplace=False)
)
(decoder): Decoder(
(embedding): Embedding(1662, 300)
(rnn): LSTM(300, 1024, num_layers=2, dropout=0.5)
(fc_out): Linear(in_features=1024, out_features=1662, bias=True)
(dropout): Dropout(p=0.5, inplace=False)
)
)
When I train the model, I got an error "Dimension out of range (expected to be in range of [-1, 0], but got 1)" In my opinion, it comes from train_fn
def train_fn(
model, data_loader, optimizer, criterion, clip, teacher_forcing_ratio, device
):
model.train()
epoch_loss = 0
for i, batch in enumerate(data_loader):
src = batch["en_ids"].to(device) # torch.Size([61, 80])
trg = batch["NPY_DATA"].to(device) # torch.Size([8312, 80])
# src = [src length, batch size]
# trg = [trg length, batch size]
optimizer.zero_grad()
output = model(src, trg, teacher_forcing_ratio) #-----------------------> Error this line
# output = [trg length, batch size, trg vocab size]
output_dim = output.shape[-1]
output = output[1:].view(-1, output_dim)
# output = [(trg length - 1) * batch size, trg vocab size]
trg = trg[1:].view(-1)
# trg = [(trg length - 1) * batch size]
loss = criterion(output, trg)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(data_loader)
Example of training a sequence-to-sequence encoder-decoder model, where the input is text and the variable target sequence is numerical.
Output:
Model size (kParams): 7.056
[epoch 1] train loss: 5.004
[epoch 100] train loss: 2.737
...
[epoch 600] train loss: 1.106
[epoch 700] train loss: 0.690
Inference run on training set:
Sample 0: I want to eat fruit
Target: [1, 2, 4, 5, -1]
Model: [0, 3, 4, 5, -1]
Sample 1: It is a cat
Target: [-2, 12, 3, 5, 6, 7]
Model: [-1, 12, 3, 5, 6, 7, -9, 9]
Sample 2: The aileron is the control surface in the wing...
Target: [1, 6, 3, 5, -1]
Model: [1, 6, 3, 5, -1]
Sample 3: By moving the stick, you cause pressure to...
Target: [1, 9, 2, 5, -1, 4]
Model: [1, 9, 3, 4, -1, 4]
Sample 4: The elevator is the part that moves with the...
Target: [7, 8, 9, 5, -2, 5]
Model: [2, 8, 5, 6, -2, 5]
...
The sentences are first tokenised into words. All of the words form the vocabulary of the dataset, and the vocabulary list is used to map each word to a numerical value.
The encoder is a sequence-to-vector model that maps the input sequence to a single encoding. The encoder first maps the input tokens to a vector using an embedding layer. Then the encoder's LSTM steps through each word in the sentence until it reaches the end. The final cell state vector represents the encoding for this sentence, and is fed to the decoder.
The decoder maps the encoding to a numerical sequence that's the same length as the target sequence. The input to the decoder is first duplicated to match the length of the target sequence. This means that we get an output sequence length that matches the target sequence length. The loss is computed between the output sequence and the target sequence, and the optimiser is stepped. The model is fed one sample at a time, as batching would require some complexity to manage the different sequence lengths.
For inference, we don't have a target (i.e. we don't know how long the output should be), so we keep running the decoder as many times as necessary until it outputs a stop token.
The dataset is just a few samples for demo purposes, but you'll also need a validation set which I've not shown.
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
df = pd.read_csv('sample_data.csv')
sentences = df['input'].values
#Convert targets from str to int
targets = [[int(s) for s in target.split(',')] for target in df['target'].values]
#
#Define the vocabulary
#
#Tokenise sentences into words
sentences_tokenised = [
sentence.lower()
.replace('.', ' <EOS>')
.replace(',', ' <COMMA>')
.split()
for sentence in sentences
]
all_tokens = np.concatenate(sentences_tokenised)
#Get the vocab list
vocab = sorted(set(all_tokens)) + ['<EOS>', '<COMMA>']
#Define a vocab dict that maps words to their index in the dictionary
vocab_dict = {word: index for index, word in enumerate(vocab)}
#
# Create a dataset
#
#Convert text sentences to a sequence of indices
sentences_numerical = [
[vocab_dict[token] for token in tokenised] for tokenised in sentences_tokenised
]
#Define a Dataset class that returns an
# (X, y) tuple of (numerical sentence, target) for each sample.
# Shapes are X: (sample L, 1), y: (target L, 1)
class MyDataset (Dataset):
def __init__(self, sentences_numerical, targets, end_of_target=-10):
self.sentences_numerical = sentences_numerical
self.targets = targets
self.end_of_target = end_of_target
def __len__(self):
return len(self.sentences_numerical)
def __getitem__(self, index):
numerical = self.sentences_numerical[index]
target = self.targets[index] + [self.end_of_target]
return torch.tensor(numerical, dtype=torch.long), torch.tensor(target).float()
train_dataset = MyDataset(sentences_numerical, targets)
#
# Define a simple encoder-decoder model
#
#A lambda layer is useful for simple manipulations
class LambdaLayer(nn.Module):
def __init__(self, func):
super().__init__()
self.func = func
def forward(self, x):
return self.func(x)
#Embedding layer
# input shape: (L, 1)
# output shape:(L, embedding_size)
embedding_size = 16
embedding_layer = nn.Embedding(num_embeddings=len(vocab),
embedding_dim=embedding_size)
#Encoder LSTM
#input shape: (L, embedding_size)
#output shape: output, (h_n, c_n)
# output is (L, hidden_size)
# (h_n, c_n) are both (num_layers, hidden_size)
encoder_hidden_size = 16
lstm_encoder = nn.LSTM(
input_size=embedding_size, hidden_size=encoder_hidden_size, num_layers=2
)
#Build the encoder net
encoder = nn.Sequential(
#in> (L, 1)
embedding_layer,
#out> (L, embedding_size)
lstm_encoder,
#out> output, (h_n, c_n)
LambdaLayer(lambda output_hncn: output_hncn[1][1][-1, :])
#out> final layer's c_n shaped (hidden_size,)
)
#Decoder class
#input shape: hidden_size
#output shape: (target L, 1)
class Decoder (nn.Module):
def __init__(self, input_size, hidden_size, target_size):
super().__init__()
self.lstm_decoder = nn.LSTM(
input_size, hidden_size, proj_size=target_size, num_layers=2
)
def forward(self, x, target_L):
x_tiled = torch.tile(x.reshape(1, -1), dims=[target_L, 1])
#in> (target L, enc hidden_size)
output, (h_n, c_n) = self.lstm_decoder(x_tiled)
#out> (target L, self.target_size)
return output
#Create the decoder
decoder = Decoder(
input_size=encoder_hidden_size, hidden_size=encoder_hidden_size, target_size=1
)
#Define the encoder-decoder
encoder_decoder = nn.Sequential(encoder, decoder)
optimiser = torch.optim.NAdam(encoder_decoder.parameters())
print(
'Model size (kParams):',
sum([p.numel() for p in encoder_decoder.parameters() if p.requires_grad]) / 1e3
)
#
# Train
#
for epoch in range(700):
encoder_decoder.train()
shuffled_ixs = torch.randperm(len(train_dataset))
cumulative_loss = 0
for sample_ix in shuffled_ixs:
sample, target = train_dataset[sample_ix]
encoding = encoder(sample)
predicted_sequence = decoder(encoding, target_L=len(target))
loss = nn.HuberLoss()(predicted_sequence.flatten(), target)
#Step optimiser
optimiser.zero_grad()
loss.backward()
optimiser.step()
cumulative_loss += loss
if epoch == 0 or (epoch + 1) % 100 == 0:
print(
f'[epoch {epoch+1:>3d}]',
f'train loss: {cumulative_loss/len(train_dataset):>5.3f}'
)
#
# View results (training set)
#
@torch.no_grad()
def get_predicted_sequence(encoder, decoder, sample, max_L=10):
encoder.eval()
decoder.eval()
encoding = encoder(sample)
predicted_sequence = decoder(encoding, target_L=max_L).ravel()
end_index = np.argwhere(predicted_sequence.round() == train_dataset.end_of_target).ravel()
return predicted_sequence[:end_index.min()] if len(end_index) else predicted_sequence
for i, (sample, target) in enumerate(train_dataset):
print(f'Sample {i}:', sentences[i])
print(' Target:', target[:-1].to(int).tolist())
print(
' Model: ',
get_predicted_sequence(encoder, decoder, sample).ravel().round().to(int).tolist()
)
print()