Search code examples
tensorflowkerassubclassing

TypeError when trying to make a loop creating artificial neural networks


I am working on an artifical neural network which I have created via subclassing.

The subclassing looks like this:

import time
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import scipy.stats as si
import sympy as sy
from sympy.stats import Normal, cdf
from sympy import init_printing



class DGMNet(tf.keras.Model):
    
    def __init__(self, n_layers, n_nodes, dimensions=1):
        """
        Parameters:
            - n_layers:     number of layers
            - n_nodes:      number of nodes in (inner) layers
            - dimensions:   number of spacial dimensions
        """
        super().__init__()
        
        self.n_layers = n_layers

        self.initial_layer = DenseLayer(dimensions + 1, n_nodes, activation="relu")
        self.lstmlikelist = []
        for _ in range(self.n_layers):
            self.lstmlikelist.append(LSTMLikeLayer(dimensions + 1, n_nodes, activation="relu"))
        self.final_layer = DenseLayer(n_nodes, 1, activation=None)


    def call(self, t, x):
        X = tf.concat([t,x], 1)

        S = self.initial_layer.call(X)
        for i in range(self.n_layers):
            S = self.lstmlikelist[i].call({'S': S, 'X': X})
        result = self.final_layer.call(S)

        return result

class DenseLayer(tf.keras.layers.Layer):
        

  def __init__(self, n_inputs, n_outputs, activation):
      """
      Parameters:
      - n_inputs:     number of inputs
      - n_outputs:    number of outputs
      - activation:   activation function
      """
      super(DenseLayer, self).__init__()
      self.n_inputs = n_inputs
      self.n_outputs = n_outputs
      self.W = self.add_weight(shape=(self.n_inputs, self.n_outputs),
                               initializer='random_normal',
                               trainable=True)
      self.b = self.add_weight(shape=(1, self.n_outputs),
                               initializer='random_normal',
                               trainable=True)
      self.activation = _get_function(activation)
      
  def call(self, inputs):
        S = tf.add(tf.matmul(inputs, self.W), self.b)
        S = self.activation(S)

    

    return S

class LSTMLikeLayer(tf.keras.layers.Layer):
    def __init__(self, n_inputs, n_outputs, activation):
        """
        Parameters:
            - n_inputs:     number of inputs
            - n_outputs:    number of outputs
            - activation:   activation function
        """
      
        super(LSTMLikeLayer, self).__init__()

        self.n_outputs = n_outputs
        self.n_inputs = n_inputs

 
        
        self.Uz = self.add_variable("Uz", shape=[self.n_inputs, self.n_outputs])
        self.Ug = self.add_variable("Ug", shape=[self.n_inputs, self.n_outputs])
        self.Ur = self.add_variable("Ur", shape=[self.n_inputs, self.n_outputs])
        self.Uh = self.add_variable("Uh", shape=[self.n_inputs, self.n_outputs])
        self.Wz = self.add_variable("Wz", shape=[self.n_outputs, self.n_outputs])
        self.Wg = self.add_variable("Wg", shape=[self.n_outputs, self.n_outputs])
        self.Wr = self.add_variable("Wr", shape=[self.n_outputs, self.n_outputs])
        self.Wh = self.add_variable("Wh", shape=[self.n_outputs, self.n_outputs])
        self.bz = self.add_variable("bz", shape=[1, self.n_outputs])
        self.bg = self.add_variable("bg", shape=[1, self.n_outputs])
        self.br = self.add_variable("br", shape=[1, self.n_outputs])
        self.bh = self.add_variable("bh", shape=[1, self.n_outputs])
        

        self.activation = _get_function(activation)


    
    def call(self, inputs):
        S = inputs['S']
        X = inputs['X']

        Z = self.activation(tf.add(tf.add(tf.matmul(X, self.Uz), tf.matmul(S, self.Wz)), self.bz))
        G = self.activation(tf.add(tf.add(tf.matmul(X, self.Ug), tf.matmul(S, self.Wg)), self.bg))
        R = self.activation(tf.add(tf.add(tf.matmul(X, self.Ur), tf.matmul(S, self.Wr)), self.br))
        H = self.activation(tf.add(tf.add(tf.matmul(X, self.Uh), tf.matmul(tf.multiply(S, R), self.Wh)), self.bh))
        Snew = tf.add(tf.multiply(tf.subtract(tf.ones_like(G), G), H), tf.multiply(Z, S))

        return Snew



def _get_function(name):
    f = None
    if name == "tanh":
        f = tf.nn.tanh
    elif name == "sigmoid":
        f = tf.nn.sigmoid
    elif name == "relu":
        f = tf.nn.relu
    elif not name:
        f = tf.identity
    
    assert f is not None
    
    return f

# Sampling
def sampler(N1, N2, N3):
    np.random.seed(42)
    # Sampler #1: PDE domain
    t1 = np.random.uniform(low=T0,
                           high=T,
                           size=[N1,1])
    s1 = np.random.uniform(low=S1,
                           high=S2,
                           size=[N1,1])

    # Sampler #2: boundary condition
    t2 = np.zeros(shape=(1, 1))
    s2 = np.zeros(shape=(1, 1))
    
    # Sampler #3: initial/terminal condition
    t3 = T * np.ones((N3,1)) #Terminal condition
    s3 = np.random.uniform(low=S1,
                           high=S2,
                           size=[N3,1])
    
    return (t1, s1, t2, s2, t3, s3)

    # Loss function
def loss(model, t1, x1, t2, x2, t3, x3):
    # Loss term #1: PDE
    V = model(t1, x1)
    V_t = tf.gradients(V, t1)[0]
    V_x = tf.gradients(V, x1)[0]
    V_xx = tf.gradients(V_x, x1)[0]
    f = V_t + r*x1*V_x + 0.5*sigma**2*x1**2*V_xx - r*V 

    L1 = tf.reduce_mean(tf.square(f))

    # Loss term #2: boundary condition
    #L2 = tf.reduce_mean(tf.square(V)) 
    
    # Loss term #3: initial/terminal condition
    L3 = tf.reduce_mean(tf.square(model(t3, x3) - tf.math.maximum(x3-K,0)))

    return (L1, L3)

    # B-S's analytical known solution
def analytical_solution(t, x):
    #C = SN(d1) - Xe- rt N(d2)
    #S: spot price
    #K: strike price
    #T: time to maturity
    #r: interest rate
    #sigma: volatility of underlying asset
    
    d1 = (np.log(x / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = (np.log(x / K) + (r - 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    
    call = (x * si.norm.cdf(d1, 0.0, 1.0) - K * np.exp(-r * T) * si.norm.cdf(d2, 0.0, 1.0))
    
    return call


# Set random seeds
np.random.seed(42)
tf.random.set_seed(42)

# Strike price

K = 0.5

# PDE parameters
r = 0.05            # Interest rate
sigma = 0.25        # Volatility

# Time limits
T0 = 0.0 + 1e-10    # Initial time
T  = 1.0            # Terminal time

# Space limits
S1 = 0.0 + 1e-10    # Low boundary
S2 = 1.0            # High boundary

# Number of samples
NS_1 = 1000
NS_2 = 0
NS_3 = 100


t1, s1, t2, s2, t3, s3 = sampler(NS_1, NS_2, NS_3)

Now what I want to do is to iterate over different parameters and create a new ann for each iteration. My plan was to do it in this way:

tf.compat.v1.disable_eager_execution()
t1_t = tf.compat.v1.placeholder(tf.float32, [None,1])
x1_t = tf.compat.v1.placeholder(tf.float32, [None,1])
t2_t = tf.compat.v1.placeholder(tf.float32, [None,1])
x2_t = tf.compat.v1.placeholder(tf.float32, [None,1])
t3_t = tf.compat.v1.placeholder(tf.float32, [None,1])
x3_t = tf.compat.v1.placeholder(tf.float32, [None,1])

volatility_list = [0.08]#[0.08, 0.16, 0.18, 0.2, 0.28]
stages_list = [10]#, 50, 100]
layers_list = [3]#, 5, 7]
npl_list = [3]#, 6, 9, 12, 15]


for sigma in volatility_list:
  for st in stages_list:
    for lay in layers_list:
      for npl in npl_list:

        # Neural Network definition
        num_layers = lay
        nodes_per_layer = npl
        ann = DGMNet(num_layers, nodes_per_layer)
        
        
        L1_t, L3_t = loss(ann, t1_t, x1_t, t2_t, x2_t, t3_t, x3_t)
        loss_t = L1_t  + L3_t

        # Optimizer parameters
        global_step = tf.Variable(1, trainable=False)
        starter_learning_rate = 0.001
        learning_rate = tf.compat.v1.train.exponential_decay(starter_learning_rate, global_step,
                                           100000, 0.96, staircase=True)
        optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss_t)

        # Training parameters
        steps_per_sample = st
        sampling_stages = 100#2000

        # Plot tensors
        tplot_t = tf.compat.v1.placeholder(tf.float32, [None,1], name="tplot_t") # We name to recover it later
        xplot_t = tf.compat.v1.placeholder(tf.float32, [None,1], name="xplot_t")
        vplot_t = tf.identity(ann(tplot_t, xplot_t), name="vplot_t") # Trick for naming the trained model    

        # Training data holders
        sampling_stages_list = []
        elapsed_time_list = []
        loss_list = []
        L1_list = []
        L3_list = [] 

        # Train network!!
        init_op = tf.compat.v1.global_variables_initializer()

        sess = tf.compat.v1.Session()
        sess.run(init_op)

        for i in range(sampling_stages):
          t1, x1, t2, x2, t3, x3 = sampler(NS_1, NS_2, NS_3)

          start_time = time.clock()
          for _ in range(steps_per_sample):
            loss, L1, L3, _ = sess.run([loss_t, L1_t, L3_t, optimizer],
                               feed_dict = {t1_t:t1, x1_t:x1, t2_t:t2, x2_t:x2, t3_t:t3, x3_t:x3})
          end_time = time.clock()
          elapsed_time = end_time - start_time

          sampling_stages_list.append(i)
          elapsed_time_list.append(elapsed_time)
          loss_list.append(loss)
          L1_list.append(L1)
          L3_list.append(L3)
    
          text = "Stage: {:04d}, Loss: {:e}, L1: {:e}, L3: {:e}, {:f} seconds".format(i, loss, L1, L3, elapsed_time)
          print(text)

        #goodness of fit
        time_0 = 0
        listofzeros = [time_0] * 100
        prices_for_goodness = np.linspace(S1,S2, 100)
        goodness_list = []
        solution_goodness = analytical_solution(listofzeros, prices_for_goodness)
        ttt = time_0*np.ones_like(prices_for_goodness.reshape(-1,1))
        nn_goodness, = sess.run([vplot_t],
                    feed_dict={tplot_t:ttt, xplot_t:prices_for_goodness.reshape(-1,1)})

          
        deviation_list = np.abs(solution_goodness - nn_goodness)/(T-T0)
        

        print("{0:.2f}%".format(np.average(deviation_list)*100))

Unfortunately as soon as it ends the first iteration I get a TypeError that 'numpy.float32' object is not callable Error Traceback:

TypeError                                 Traceback (most recent call last)

<ipython-input-14-bb14643d0c42> in <module>()
     10 
     11 
---> 12         L1_t, L3_t = loss(ann, t1_t, x1_t, t2_t, x2_t, t3_t, x3_t)
     13         loss_t = L1_t  + L3_t
     14 

TypeError: 'numpy.float32' object is not callable

I guess that the problem is with the creation of the placeholders, however I am not sure how to solve it. Maybe one of you can help me

Thanks in advance! Chris


Solution

  • Did you create a variable called 'loss'? It seems that the loss function is redefined by a variable with the same name, so then python tries to call that variable as a function.