Search code examples
pythontensorflowkerasdeep-learningsequential

How can I implement this model?


Problem statement

I have 3 classes (A, B, and C). I have 6 features:

train_x = [[ 6.442  6.338  7.027  8.789 10.009 12.566]
           [ 6.338  7.027  5.338 10.009  8.122 11.217]
           [ 7.027  5.338  5.335  8.122  5.537  6.408]
           [ 5.338  5.335  5.659  5.537  5.241  7.043]]

These features represent a 5-character string pattern comprising of 3-classes(e.g. AABBC, etc.). Let, a 5-character string pattern is one-hot encoded as follows:

train_z = [[0. 0. 1. 0. 0. 1. 0. 0. 1. 0. 0. 1. 1. 0. 0.]    
           [0. 0. 1. 0. 0. 1. 0. 0. 1. 1. 0. 0. 1. 0. 0.]
           [0. 0. 1. 0. 0. 1. 1. 0. 0. 1. 0. 0. 1. 0. 0.]    
           [0. 0. 1. 1. 0. 0. 1. 0. 0. 1. 0. 0. 0. 0. 1.]]

My implementation

I have implemented the above problem using a sequential model as follows:

import os

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

import sys
import time
import random
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
import numpy as np

# <editor-fold desc="handle GPU">
# resolve GPU related issues.
try:
    physical_devices = tf.config.list_physical_devices("GPU")
    tf.config.experimental.set_memory_growth(physical_devices[0], True)
except Exception as e:
    print("GPU not found!")
# END of try
# </editor-fold>

# Directories and files
CLASS_INDEX = 4
FEATURE_START_INDEX = 6
OUTPUT_PATH = r"./"
INPUT_PATH = r"./"
INPUT_DATA_FILE = "dist-5.dat"
TRAINING_PROGRESS_FILE = "training.txt"
MODEL_FILE = "model.h5"

# classification size
CLASSES_COUNT = 3
FEATURES_COUNT = 6
OUTPUTS_COUNT = 15

# Network parameters.
LAYER_1_NEURON_COUNT = 128
LAYER_2_NEURON_COUNT = 128

# Training parameters.
LEARNING_RATE = 0.01
EPOCHS = 1000  # 500
BATCH_SIZE = 10
NO_OF_INPUT_LINES = 10000
VALIDATION_PART = 0.5
MODEL_SAVE_FREQUENCY = 10

# <editor-fold desc="encoding()">
# <editor-fold desc="def encode(letter)">
def encode(letter: str):
    if letter == 'A':
        return [1.0, 0.0, 0.0]
    elif letter == 'B':
        return [0.0, 1.0, 0.0]
    elif letter == 'C':
        return [0.0, 0.0, 1.0]
# </editor-fold>

# <editor-fold desc="encode_string()">
def encode_string_1(pattern_str: str):
    # Iterate over the string
    one_hot_binary_str = []
    for ch in pattern_str:
        one_hot_binary_str = one_hot_binary_str + encode(ch)
    # END of for loop
    return one_hot_binary_str
# END of function

def encode_string_2(pattern_str: str):
    # Iterate over the string
    one_hot_binary_str = []
    for ch in pattern_str:
        temp_encoded_vect = [encode(ch)]
        one_hot_binary_str = one_hot_binary_str + temp_encoded_vect
    # END of for loop
    return one_hot_binary_str
# END of function
# </editor-fold>

# <editor-fold desc="def load_data()">
def load_data_k(fname: str, class_index: int, feature_start_index: int, **selection):
    i = 0
    file = open(fname)
    if "top_n_lines" in selection:
        lines = [next(file) for _ in range(int(selection["top_n_lines"]))]
    elif "random_n_lines" in selection:
        tmp_lines = file.readlines()
        lines = random.sample(tmp_lines, int(selection["random_n_lines"]))
    else:
        lines = file.readlines()

    data_x, data_y, data_z = [], [], []
    for l in lines:
        row = l.strip().split()  # return a list of words from the line.
        x = [float(ix) for ix in row[feature_start_index:]]  # convert 3rd to 20th word into a vector of float numbers.
        y = encode(row[class_index])  # convert the 3rd word into binary.
        z = encode_string_1(row[class_index+1])
        data_x.append(x)  # append the vector into 'data_x'
        data_y.append(y)  # append the vector into 'data_y'
        data_z.append(z)  # append the vector into 'data_z'
    # END for l in lines

    num_rows = len(data_x)
    given_fraction = selection.get("validation_part", 1.0)
    if given_fraction > 0.9999:
        valid_x, valid_y, valid_z = data_x, data_y, data_z
    else:
        n = int(num_rows * given_fraction)
        valid_x, valid_y, valid_z = data_x[n:], data_y[n:], data_z[n:]
        data_x, data_y, data_z = data_x[:n], data_y[:n], data_z[:n]
    # END of if-else block

    tx = tf.convert_to_tensor(data_x, np.float32)
    ty = tf.convert_to_tensor(data_y, np.float32)
    tz = tf.convert_to_tensor(data_z, np.float32)
    vx = tf.convert_to_tensor(valid_x, np.float32)
    vy = tf.convert_to_tensor(valid_y, np.float32)
    vz = tf.convert_to_tensor(valid_z, np.float32)

    return tx, ty, tz, vx, vy, vz
# END of the function
# </editor-fold>
# </editor-fold>

# <editor-fold desc="def create_model()">
def create_model(n_hidden_1, n_hidden_2, num_outputs, num_features):
    # a simple sequential model
    model = tf.keras.Sequential()
    model.add(tf.keras.Input(shape=(num_features,)))
    model.add(tf.keras.layers.Dense(n_hidden_1, activation="relu"))
    model.add(tf.keras.layers.Dense(n_hidden_2, activation="relu"))
    model.add(tf.keras.layers.Dense(num_outputs))
    return model
# </editor-fold>

# custom loss to take into the dependency between the 3 bits
def loss(y_true, y_pred):
    l1 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, :3], y_pred[:, :3])
    l2 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 3:6], y_pred[:, 3:6])
    l3 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 6:9], y_pred[:, 6:9])
    l4 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 9:12], y_pred[:, 9:12])
    l5 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 12:], y_pred[:, 12:])
    return l1 + l2 + l3 + l4 + l5


if __name__ == "__main__":
    len_int = len(sys.argv)
    arg_str = None

    if len_int > 1:
        arg_str = sys.argv[1]
    else:
        arg_str = os.path.join(INPUT_PATH, INPUT_DATA_FILE)
    # END of if len_int > 1:

    # load training data from the disk
    train_x, train_y, train_z, validate_x,validate_y, validate_z = load_data_k(
        os.path.join(INPUT_PATH, INPUT_DATA_FILE),
        class_index=CLASS_INDEX,
        feature_start_index=FEATURE_START_INDEX,
        top_n_lines=NO_OF_INPUT_LINES,
        validation_part=VALIDATION_PART
    )

    #print(train_y)
    print("z = " + str(train_z))

    # create Stochastic Gradient Descent optimizer for the NN model
    opt_function = keras.optimizers.Adam(
        learning_rate=LEARNING_RATE
    )
    # create a sequential NN model
    model = create_model(
        LAYER_1_NEURON_COUNT,
        LAYER_2_NEURON_COUNT,
        OUTPUTS_COUNT,
        FEATURES_COUNT
    )
    #
    model.compile(optimizer=opt_function, loss=loss, metrics=['accuracy'])
    model.fit(train_x, train_z, epochs=EPOCHS,batch_size=BATCH_SIZE)

The problem

The problem with this source code is, the model is not converging i.e The accuracy is not increasing with increasing epochs.

The question

How can I implement this model?


Solution

  • The problem is with how keras calculate the accuracy. For example, in the code below

    y_true = np.array([[1,0,0,0,1,0,0,0,1]]) 
    y_pred = np.array([[.8,.1,.1,1,10,2,2,3,5.5]]) 
    
    metric = tf.keras.metrics.Accuracy()
    metric.update_state(y_true,y_pred)
    metric.result().numpy()
    

    The calculated accuracy is zero, however, by comparing

    1. [.8,.1,.1] with [1,0,0]
    2. [1,10,2] with [0,1,0]
    3. [2,3,5.5] with [0,0,1]

    we know the y_pred is actually very accurate, and this might be the reason why your model just does not work. In order to handle this problem under the current model, applying sigmoid activation in the output layer might help, you can check this by running the following code

    import numpy as np
    import tensorflow as tf 
    import keras
    from sklearn.preprocessing import MinMaxScaler
    
    
    def dataset_gen(num_samples):
        # each data row consists of six floats, which is the feature vector of a 5-character 
        # string pattern comprising of 3-classes(e.g. AABBC, etc.)
        # in order to represent this 5-character string, a sequentially ordered one-hot encoding vector is used 
        np.random.seed(0)
        output_classes = np.random.randint(0,3,size=(num_samples,5))
        transform_mat = np.arange(-15,15).reshape(5,6) + .1*np.random.rand(5,6)
        print(transform_mat)
        feature_vec = output_classes @ transform_mat
        output_classes += np.array([0,3,6,9,12])
        # convert output_classes to one-hot encoding 
        output_vec = np.zeros((num_samples,15))
        for ind,item in enumerate(output_classes):
            output_vec[ind][item] = 1.
        
        return feature_vec,output_vec
    
    
    def create_model():
        # a simple sequential model
        n_hidden,num_features,num_outputs = 16,6,15
        model = tf.keras.Sequential()
        model.add(tf.keras.Input(shape=(num_features,)))
        model.add(tf.keras.layers.Dense(n_hidden,activation="relu"))
        model.add(tf.keras.layers.Dense(num_outputs,activation="sigmoid"))
        return model
    
    def loss(y_true, y_pred):
        l1 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, :3], y_pred[:, :3])
        l2 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 3:6], y_pred[:, 3:6])
        l3 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 6:9], y_pred[:, 6:9])
        l4 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 9:12], y_pred[:, 9:12])
        l5 = tf.nn.softmax_cross_entropy_with_logits(y_true[:, 12:], y_pred[:, 12:])
        
        return l1 + l2 + l3 + l4 + l5
    
    # create Stochastic Gradient Descent optimizer for the NN model
    # opt_function = keras.optimizers.Adam(learning_rate=.1)
    # create a sequential NN model
    model = create_model()
    model.compile(optimizer='adam', loss=loss, metrics=['accuracy'])
    
    es = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy',mode='max',verbose=1,patience=100)
    history = model.fit(test_x,test_z,epochs=2000,batch_size=8,
                        callbacks=es,validation_split=0.2,
                        verbose=0)