I am trying to do binary classification by finding a pattern (say "CTCATGTCA") in DNA sequence using CNN. I wrote a model in pytorch. When the pattern is at the center of the sequence, the model detects it. But if the pattern is at random places, the model is not working. How to make CNN invariant to position of the pattern?
This is my code:
import logging
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
from sklearn import metrics
from skorch import NeuralNetClassifier
from skorch.callbacks import EpochScoring
from torch.utils.data import DataLoader, Dataset
import numpy as np
import constants
timber = logging.getLogger()
logging.basicConfig(level=logging.INFO) # change to level=logging.DEBUG to print more logs...
# utils
def one_hot_e(dna_seq: str) -> np.ndarray:
mydict = {'A': np.asarray([1.0, 0.0, 0.0, 0.0]), 'C': np.asarray([0.0, 1.0, 0.0, 0.0]),
'G': np.asarray([0.0, 0.0, 1.0, 0.0]), 'T': np.asarray([0.0, 0.0, 0.0, 1.0]),
'N': np.asarray([0.0, 0.0, 0.0, 0.0]), 'H': np.asarray([0.0, 0.0, 0.0, 0.0]),
'a': np.asarray([1.0, 0.0, 0.0, 0.0]), 'c': np.asarray([0.0, 1.0, 0.0, 0.0]),
'g': np.asarray([0.0, 0.0, 1.0, 0.0]), 't': np.asarray([0.0, 0.0, 0.0, 1.0]),
'n': np.asarray([0.0, 0.0, 0.0, 0.0]), '-': np.asarray([0.0, 0.0, 0.0, 0.0])}
size_of_a_seq: int = len(dna_seq)
# forward = np.zeros(shape=(size_of_a_seq, 4))
forward_list: list = [mydict[dna_seq[i]] for i in range(0, size_of_a_seq)]
encoded = np.asarray(forward_list)
return encoded
def one_hot_e_column(column: pd.Series) -> np.ndarray:
tmp_list: list = [one_hot_e(seq) for seq in column]
encoded_column = np.asarray(tmp_list)
return encoded_column
def reverse_dna_seq(dna_seq: str) -> str:
# m_reversed = ""
# for i in range(0, len(dna_seq)):
# m_reversed = dna_seq[i] + m_reversed
# return m_reversed
return dna_seq[::-1]
def complement_dna_seq(dna_seq: str) -> str:
comp_map = {"A": "T", "C": "G", "T": "A", "G": "C",
"a": "t", "c": "g", "t": "a", "g": "c",
"N": "N", "H": "H", "-": "-",
"n": "n", "h": "h"
}
comp_dna_seq_list: list = [comp_map[nucleotide] for nucleotide in dna_seq]
comp_dna_seq: str = "".join(comp_dna_seq_list)
return comp_dna_seq
def reverse_complement_dna_seq(dna_seq: str) -> str:
return reverse_dna_seq(complement_dna_seq(dna_seq))
def reverse_complement_dna_seqs(column: pd.Series) -> pd.Series:
tmp_list: list = [reverse_complement_dna_seq(seq) for seq in column]
rc_column = pd.Series(tmp_list)
return rc_column
class CNN1D(nn.Module):
def __init__(self,
in_channel_num_of_nucleotides=4,
kernel_size_k_mer_motif=4,
dnn_size=256,
num_filters=1,
lstm_hidden_size=128,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.conv1d = nn.Conv1d(in_channels=in_channel_num_of_nucleotides, out_channels=num_filters,
kernel_size=kernel_size_k_mer_motif, stride=2)
self.activation = nn.ReLU()
self.pooling = nn.MaxPool1d(kernel_size=kernel_size_k_mer_motif, stride=2)
self.flatten = nn.Flatten()
# linear layer
self.dnn2 = nn.Linear(in_features=14 * num_filters, out_features=dnn_size)
self.act2 = nn.Sigmoid()
self.dropout2 = nn.Dropout(p=0.2)
self.out = nn.Linear(in_features=dnn_size, out_features=1)
self.out_act = nn.Sigmoid()
pass
def forward(self, x):
timber.debug(constants.magenta + f"h0: {x}")
h = self.conv1d(x)
timber.debug(constants.green + f"h1: {h}")
h = self.activation(h)
timber.debug(constants.magenta + f"h2: {h}")
h = self.pooling(h)
timber.debug(constants.blue + f"h3: {h}")
timber.debug(constants.cyan + f"h4: {h}")
h = self.flatten(h)
timber.debug(constants.magenta + f"h5: {h},\n shape {h.shape}, size {h.size}")
h = self.dnn2(h)
timber.debug(constants.green + f"h6: {h}")
h = self.act2(h)
timber.debug(constants.blue + f"h7: {h}")
h = self.dropout2(h)
timber.debug(constants.cyan + f"h8: {h}")
h = self.out(h)
timber.debug(constants.magenta + f"h9: {h}")
h = self.out_act(h)
timber.debug(constants.green + f"h10: {h}")
# h = (h > 0.5).float() # <---- should this go here?
# timber.debug(constants.green + f"h11: {h}")
return h
class CustomDataset(Dataset):
def __init__(self, dataframe):
self.x = dataframe["Sequence"]
self.y = dataframe["class"]
def __len__(self):
return len(self.y)
def preprocessing(self, x1, y1) -> (torch.Tensor, torch.Tensor, torch.Tensor):
forward_col = x1
backward_col = reverse_complement_dna_seqs(forward_col)
forward_one_hot_e_col: np.ndarray = one_hot_e_column(forward_col)
backward_one_hot_e_col: np.ndarray = one_hot_e_column(backward_col)
tr_xf_tensor = torch.Tensor(forward_one_hot_e_col).permute(1, 2, 0)
tr_xb_tensor = torch.Tensor(backward_one_hot_e_col).permute(1, 2, 0)
# timber.debug(f"y1 {y1}")
tr_y1 = np.array([y1]) # <--- need to put it inside brackets
return tr_xf_tensor, tr_xb_tensor, tr_y1
def __getitem__(self, idx):
m_seq = self.x.iloc[idx]
labels = self.y.iloc[idx]
xf, xb, y = self.preprocessing(m_seq, labels)
timber.debug(f"xf -> {xf.shape}, xb -> {xb.shape}, y -> {y}")
return xf, xb, y
def test_dataloader():
df = pd.read_csv("todo.csv")
X = df["Sequence"]
y = df["class"]
ds = CustomDataset(df)
loader = DataLoader(ds, shuffle=True, batch_size=16)
train_loader = loader
for data in train_loader:
timber.debug(data)
# xf, xb, y = data[0], data[1], data[2]
# timber.debug(f"xf -> {xf.shape}, xb -> {xb.shape}, y -> {y.shape}")
pass
def get_callbacks() -> list:
# metric.auc ( uses trapezoidal rule) gave an error: x is neither increasing, nor decreasing. so I had to remove it
return [
("tr_acc", EpochScoring(
metrics.accuracy_score,
lower_is_better=False,
on_train=True,
name="train_acc",
)),
("tr_recall", EpochScoring(
metrics.recall_score,
lower_is_better=False,
on_train=True,
name="train_recall",
)),
("tr_precision", EpochScoring(
metrics.precision_score,
lower_is_better=False,
on_train=True,
name="train_precision",
)),
("tr_roc_auc", EpochScoring(
metrics.roc_auc_score,
lower_is_better=False,
on_train=False,
name="tr_auc"
)),
("tr_f1", EpochScoring(
metrics.f1_score,
lower_is_better=False,
on_train=False,
name="tr_f1"
)),
# ("valid_acc1", EpochScoring(
# metrics.accuracy_score,
# lower_is_better=False,
# on_train=False,
# name="valid_acc1",
# )),
("valid_recall", EpochScoring(
metrics.recall_score,
lower_is_better=False,
on_train=False,
name="valid_recall",
)),
("valid_precision", EpochScoring(
metrics.precision_score,
lower_is_better=False,
on_train=False,
name="valid_precision",
)),
("valid_roc_auc", EpochScoring(
metrics.roc_auc_score,
lower_is_better=False,
on_train=False,
name="valid_auc"
)),
("valid_f1", EpochScoring(
metrics.f1_score,
lower_is_better=False,
on_train=False,
name="valid_f1"
))
]
def start():
# df = pd.read_csv("data64.csv") # use this line
df = pd.read_csv("data64random.csv")
X = df["Sequence"]
y = df["class"]
npa = np.array([y.values])
torch_tensor = torch.tensor(npa) # [0, 1, 1, 0, ... ... ] a simple list
print(f"torch_tensor: {torch_tensor}")
# need to transpose it!
yt = torch.transpose(torch_tensor, 0, 1)
ds = CustomDataset(df)
loader = DataLoader(ds, shuffle=True)
# train_loader = loader
# test_loader = loader # todo: load another dataset later
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN1D().to(device)
m_criterion = nn.BCEWithLogitsLoss
# optimizer = optim.Adam(model.parameters(), lr=0.001)
m_optimizer = optim.Adam
net = NeuralNetClassifier(
model,
max_epochs=200,
criterion=m_criterion,
optimizer=m_optimizer,
lr=0.01,
# decay=0.01,
# momentum=0.9,
device=device,
classes=["no_mqtl", "yes_mqtl"],
verbose=True,
callbacks=get_callbacks()
)
ohe_c = one_hot_e_column(X)
print(f"ohe_c shape {ohe_c.shape}")
ohe_c = torch.Tensor(ohe_c)
ohe_c = ohe_c.permute(0, 2, 1)
ohe_c = ohe_c.to(device)
print(f"ohe_c shape {ohe_c.shape}")
net.fit(X=ohe_c, y=yt)
y_proba = net.predict_proba(ohe_c)
# timber.info(f"y_proba = {y_proba}")
pass
if __name__ == '__main__':
start()
# test_dataloader()
pass
And you can find the 2 datasets
You can quickly download all using this gist link
The model below uses Conv1d
layers and gets to a validation accuracy of 99% to 100%. It didn't take much tuning to get there.
I trained on 60% of the data, with 30% used for validation. The model is relatively small, at about 700 parameters.
The model has a wide receptive field at the input, and it matches the pattern length. Model performance was quite sensitive to this initial configuration. The subsequent conv layers double the feature size to 8, and trim the sequence slightly. A final maxpool layer halves the remaining sequence length, before it's flattened and mapped to a scalar via a dense layer.
I found that NAdam worked better than Adam, and didn't explore optimisers further. Batch sizes of 8 and below worked well.
...
[epoch 13] trn loss: 0.011 [acc: 100.000%] | val loss: 0.011 [acc: 100.000%]
[epoch 14] trn loss: 0.014 [acc: 100.000%] | val loss: 0.009 [acc: 99.667%]
[epoch 15] trn loss: 0.008 [acc: 100.000%] | val loss: 0.006 [acc: 100.000%]
I tried conv layers rather than recurrent cells as they tend to be faster and perform well. Since performance was good I didn't look into LSTMs. Your model seems quite large - try making it smaller and seeing if that improves things.
Code for preparing the data, defining and training the model, and printing/plotting the results.
import numpy as np
from matplotlib import pyplot as plt
import pandas as pd
import torch
from torch.utils.data import DataLoader
from torch import nn
np.random.seed(0)
def one_hot_e(dna_seq: str) -> np.ndarray:
mydict = {'A': np.asarray([1.0, 0.0, 0.0, 0.0]), 'C': np.asarray([0.0, 1.0, 0.0, 0.0]),
'G': np.asarray([0.0, 0.0, 1.0, 0.0]), 'T': np.asarray([0.0, 0.0, 0.0, 1.0]),
'N': np.asarray([0.0, 0.0, 0.0, 0.0]), 'H': np.asarray([0.0, 0.0, 0.0, 0.0]),
'a': np.asarray([1.0, 0.0, 0.0, 0.0]), 'c': np.asarray([0.0, 1.0, 0.0, 0.0]),
'g': np.asarray([0.0, 0.0, 1.0, 0.0]), 't': np.asarray([0.0, 0.0, 0.0, 1.0]),
'n': np.asarray([0.0, 0.0, 0.0, 0.0]), '-': np.asarray([0.0, 0.0, 0.0, 0.0])}
size_of_a_seq: int = len(dna_seq)
# forward = np.zeros(shape=(size_of_a_seq, 4))
forward_list: list = [mydict[dna_seq[i]] for i in range(0, size_of_a_seq)]
encoded = np.asarray(forward_list)
return encoded
#
#Load and prepare data "CTCATGTCA"
#
df = pd.read_csv('data64random.csv')
#To numpy arrays, and encode X
X = np.stack([one_hot_e(row) for row in df.Sequence], axis=0)
y = df['class'].values
#Shuffle and split
train_size = int(0.6 * len(X))
val_size = int(0.3 * len(X))
shuffle_ixs = np.random.permutation(len(X))
X, y = [arr[shuffle_ixs] for arr in [X, y]]
X_train, y_train = [arr[:train_size] for arr in [X, y]]
X_val, y_val = [arr[train_size:train_size + val_size] for arr in [X, y]]
#As tensors. Useful for passing directly to model as a single large batch.
X_train_t, y_train_t = [torch.tensor(arr).float() for arr in [X_train, y_train]]
X_val_t, y_val_t = [torch.tensor(arr).float() for arr in [X_val, y_val]]
#
#Define the model
#
#Lambda layer useful for simple manipulations
class LambdaLayer(nn.Module):
def __init__(self, func):
super().__init__()
self.func = func
def forward(self, x):
return self.func(x)
#batch, 64, chan
#The model
seq_len = X[0].shape[0] #64 characters long
n_features = X[0].shape[1] #4-dim onehot encoding
torch.manual_seed(0)
model = nn.Sequential(
#> (batch, seq_len, channels)
LambdaLayer(lambda x: x.swapdims(1, 2)),
#> (batch, channels, seq_len)
#Initial wide receptive field (and it matches length of the pattern)
nn.Conv1d(in_channels=n_features, out_channels=4, kernel_size=9, padding='same'),
nn.ReLU(),
nn.BatchNorm1d(num_features=4),
#Conv block 1 doubles features
nn.Conv1d(in_channels=4, out_channels=8, kernel_size=3),
nn.ReLU(),
nn.BatchNorm1d(num_features=8),
#Conv block 2, then maxpool
nn.Conv1d(in_channels=8, out_channels=8, kernel_size=3),
nn.ReLU(),
nn.BatchNorm1d(num_features=8),
#Output layer: flatten, linear
nn.MaxPool1d(kernel_size=2, stride=2), #batch, feat, seq
nn.Flatten(start_dim=1), #batch, feat*seq
nn.Linear(8 * 30, 1),
)
print(
'Model size is',
sum([p.numel() for p in model.parameters() if p.requires_grad]),
'trainable parameters'
)
#Train loader for batchifying train data
train_loader = DataLoader(list(zip(X_train_t, y_train_t)), shuffle=True, batch_size=8)
optimiser = torch.optim.NAdam(model.parameters())
loss_fn = nn.BCEWithLogitsLoss()
from collections import defaultdict
metrics_dict = defaultdict(list)
for epoch in range(n_epochs := 15):
model.train()
cum_loss = 0
for X_minibatch, y_minibatch in train_loader:
logits = model(X_minibatch).ravel()
loss = loss_fn(logits, y_minibatch)
optimiser.zero_grad()
loss.backward()
optimiser.step()
cum_loss += loss.item() * len(X_minibatch)
#/end of epoch
time_to_print = (epoch == 0) or ((epoch + 1) % 1) == 0
if not time_to_print:
continue
model.eval()
with torch.no_grad():
val_logits = model(X_val_t).ravel()
trn_logits = model(X_train_t).ravel()
val_loss = loss_fn(val_logits, y_val_t).item()
trn_loss = cum_loss / len(X_train_t)
val_acc = ((nn.Sigmoid()(val_logits) > 0.5).int() == y_val_t.int()).float().mean().item()
trn_acc = ((nn.Sigmoid()(trn_logits) > 0.5).int() == y_train_t.int()).float().mean().item()
print(
f'[epoch {epoch + 1:>3d}]',
f'trn loss: {trn_loss:>5.3f} [acc: {trn_acc:>8.3%}] |',
f'val loss: {val_loss:>5.3f} [acc: {val_acc:>8.3%}]'
)
#Record metrics
metrics_dict['epoch'].append(epoch + 1)
metrics_dict['trn_loss'].append(trn_loss)
metrics_dict['val_loss'].append(val_loss)
metrics_dict['trn_acc'].append(trn_acc)
metrics_dict['val_acc'].append(val_acc)
#View training curves
metrics_df = pd.DataFrame(metrics_dict).set_index('epoch')
ax = metrics_df.plot(
use_index=True, y=['trn_loss', 'val_loss'], ylabel='loss',
figsize=(8, 4), legend=False, linewidth=3, marker='s', markersize=7
)
metrics_df.mul(100).plot(
use_index=True, y=['trn_acc', 'val_acc'], ylabel='acc',
linestyle='--', marker='o', ax=ax.twinx(), legend=False
)
ax.figure.legend(ncol=2)
ax.set_title('training curves')