I'm trying to build a simple single-headed transformer, with # input -> embedding -> single-head transformer -> dense -> output these layers, I think I have done too much, and now I keep getting an error that says my input and output tensor sizes don't match and I think my error is in my forward method because that's what I see in my debugging statements
I'm going to paste in my code for the entire transformer:
Data input:
import torch
from torch.utils.data import Dataset, DataLoader
import ast
# my vocab is already previously defined
token_to_index = {token: idx for idx, token in enumerate(vocab)}
index_to_token = {idx: token for token, idx in token_to_index.items()}
#result_df['tokens'] = result_df['tokens'].apply(ast.literal_eval)
result_df['numerical_tokens'] = result_df['tokens'].apply(lambda tokens: [token_to_index.get(token, 0) for token in tokens])
result_df['numerical_tokens'] = result_df['tokens'].apply(lambda tokens: [token_to_index[token] for token in tokens])
class TokenSequenceDataset(Dataset):
def __init__(self, sequences, seq_len):
self.sequences = sequences
self.seq_len = seq_len
def __len__(self):
return len(self.sequences)
def __getitem__(self, idx):
sequence = self.sequences[idx]
sequence = sequence[:self.seq_len]
padding = [0] * (self.seq_len - len(sequence)) # padding
sequence = sequence + padding
x = sequence[:-1]
y = sequence[1:]
return torch.tensor(x, dtype=torch.long), torch.tensor(y, dtype=torch.long)
max_seq_length = 50
numerical_sequences = result_df['numerical_tokens'].tolist()
dataset = TokenSequenceDataset(numerical_sequences, max_seq_length)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# print batch of data
for batch in dataloader:
x, y = batch
print("Input batch:", x)
print("Target batch:", y)
break
single-headed transformer
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy
from torch.utils.data import Dataset, DataLoader
class SingleHeadAttention(nn.Module):
def __init__(self, d_model):
super(SingleHeadAttention, self).__init__()
self.d_model = d_model
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_model)
if mask is not None:
attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
attn_probs = torch.softmax(attn_scores, dim=-1)
output = torch.matmul(attn_probs, V)
return output
def forward(self, Q, K, V, mask=None):
Q = self.W_q(Q)
K = self.W_k(K)
V = self.W_v(V)
attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
output = self.W_o(attn_output)
return output
class PositionWiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(PositionWiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_seq_length):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_seq_length, d_model)
position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
return x + self.pe[:, :x.size(1)]
class EncoderLayer(nn.Module):
def __init__(self, d_model, d_ff, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = SingleHeadAttention(d_model)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
attn_output = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm2(x + self.dropout(ff_output))
return x
class DecoderLayer(nn.Module):
def __init__(self, d_model, d_ff, dropout):
super(DecoderLayer, self).__init__()
self.self_attn = SingleHeadAttention(d_model)
self.cross_attn = SingleHeadAttention(d_model)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, src_mask, tgt_mask):
attn_output = self.self_attn(x, x, x, tgt_mask)
x = self.norm1(x + self.dropout(attn_output))
attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
x = self.norm2(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm3(x + self.dropout(ff_output))
return x
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_layers, d_ff, max_seq_length, dropout):
super(Transformer, self).__init__()
self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, d_ff, dropout) for _ in range(num_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, d_ff, dropout) for _ in range(num_layers)])
self.fc = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)
def generate_mask(self, src, tgt):
src_mask = (src != 0).unsqueeze(1).unsqueeze(2) # Shape: [batch_size, 1, 1, src_len]
tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3) # Shape: [batch_size, 1, tgt_len, 1]
seq_length = tgt.size(1)
nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool() # Shape: [1, tgt_len, tgt_len]
tgt_mask = tgt_mask & nopeak_mask # Shape: [batch_size, 1, tgt_len, tgt_len]
return src_mask, tgt_mask
#def generate_mask(self, src, tgt):
#src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
#tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
#seq_length = tgt.size(1)
#nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
#tgt_mask = tgt_mask & nopeak_mask
#return src_mask, tgt_mask
# there is something here giving me issues I haven't been able to figure out, the tensor that is being output is of four dimensions, when the model works with only three dimensions, this hasn't been an issue with my multi-headed transformer
def forward(self, src, tgt):
src_mask, tgt_mask = self.generate_mask(src, tgt)
src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
print("Src embedded shape:", src_embedded.shape) # Debugging statement
tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
print("Tgt embedded shape:", tgt_embedded.shape) # Debugging statement
enc_output = src_embedded
for enc_layer in self.encoder_layers:
enc_output = enc_layer(enc_output, src_mask)
print("Enc output shape after layer:", enc_output.shape) # Debugging statement
dec_output = tgt_embedded
for dec_layer in self.decoder_layers:
dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
print("Dec output shape after layer:", dec_output.shape) # Debugging statement
# Assuming self.fc should produce [batch_size, seq_len, vocab_size]
dec_output_reshaped = dec_output.view(-1, dec_output.size(-1))
output = self.fc(dec_output_reshaped).view(dec_output.size(0), dec_output.size(1), -1)
print("Final output shape:", output.shape) # Debugging statement
return output
training loop
src_vocab_size = len(token_to_index)
tgt_vocab_size = len(token_to_index)
d_model = 128
num_layers = 6
d_ff = 512
max_seq_length = 50 # should there be an adjustment here?
dropout = 0.1
# model instance
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_layers, d_ff, max_seq_length, dropout)
# defining loss function
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=0.001)
# training loop
# training loop
num_epochs = 10
# training loop
num_epochs = 10
for epoch in range(num_epochs):
model.train()
total_loss = 0
for x, y in dataloader:
optimizer.zero_grad()
output = model(x, y[:, :-1])
print("Output size before reshaping:", output.size()) # debugging statement
batch_size, seq_len, vocab_size = output.size()
# Ensure the output is reshaped correctly
output_reshaped = output.view(batch_size * seq_len, vocab_size)
target_reshaped = y[:, 1:].contiguous().view(-1)
# The assertion should now pass if the total number of elements matches
assert output_reshaped.size(0) == target_reshaped.size(0), f"Output and target sizes must match: {output_reshaped.size()} vs {target_reshaped.size()}"
loss = criterion(output_reshaped, target_reshaped)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f'Epoch {epoch + 1}, Loss: {total_loss / len(dataloader)}')
# evaluation
def evaluate(model, dataloader):
model.eval()
total_loss = 0
with torch.no_grad():
for x, y in dataloader:
output = model(x, y[:, :-1])
# debugging
batch_size, seq_len, vocab_size = output.size()
output_reshaped = output.view(batch_size * seq_len, vocab_size)
target_reshaped = y[:, 1:].contiguous().view(-1)
loss = criterion(output_reshaped, target_reshaped)
total_loss += loss.item()
return total_loss / len(dataloader)
# prediction
def predict(model, input_sequence, max_length=50):
model.eval()
input_tensor = torch.tensor(input_sequence, dtype=torch.long).unsqueeze(0)
generated_sequence = input_sequence
for _ in range(max_length - len(input_sequence)):
with torch.no_grad():
output = model(input_tensor, input_tensor)
next_token = torch.argmax(output[:, -1, :], dim=-1).item()
generated_sequence.append(next_token)
input_tensor = torch.tensor(generated_sequence, dtype=torch.long).unsqueeze(0)
return generated_sequence
and if someone could help me with the overall structure of my transformer that'd also be great because I'm really stuck, thanks!
and this is my error:
Src embedded shape: torch.Size([32, 49, 128])
Tgt embedded shape: torch.Size([32, 48, 128])
Enc output shape after layer: torch.Size([32, 32, 49, 128])
Enc output shape after layer: torch.Size([32, 32, 49, 128])
Enc output shape after layer: torch.Size([32, 32, 49, 128])
Enc output shape after layer: torch.Size([32, 32, 49, 128])
Enc output shape after layer: torch.Size([32, 32, 49, 128])
Enc output shape after layer: torch.Size([32, 32, 49, 128])
Dec output shape after layer: torch.Size([32, 32, 48, 128])
Dec output shape after layer: torch.Size([32, 32, 48, 128])
Dec output shape after layer: torch.Size([32, 32, 48, 128])
Dec output shape after layer: torch.Size([32, 32, 48, 128])
Dec output shape after layer: torch.Size([32, 32, 48, 128])
Dec output shape after layer: torch.Size([32, 32, 48, 128])
Final output shape: torch.Size([32, 32, 7200])
Output size before reshaping: torch.Size([32, 32, 7200])
AssertionError: Output and target sizes must match: torch.Size([1024, 7200]) vs torch.Size([1536])
---------------------------------------------------------------------------
AssertionError Traceback (most recent call last)
File <command-1586568334099217>, line 36
33 target_reshaped = y[:, 1:].contiguous().view(-1)
35 # The assertion should now pass if the total number of elements matches
---> 36 assert output_reshaped.size(0) == target_reshaped.size(0), f"Output and target sizes must match: {output_reshaped.size()} vs {target_reshaped.size()}"
38 loss = criterion(output_reshaped, target_reshaped)
39 loss.backward()
AssertionError: Output and target sizes must match: torch.Size([1024, 7200]) vs torch.Size([1536])
Your attention masks have an extra dimension which is propagated to your activations.
Your inputs (after embedding) are of shape (bs, sl, d_model)
. This shape (with three dimensions) should be maintained after every transformer layer. The extra dimensions on your attention mask cast the output to (bs, bs, sl, d_model)
.
Referencing the pytorch attention implementation, the key padding mask should be of shape (bs, sl)
and the attention mask should be of shape (bs, sl_decoder, dl_encoder)
.