Search code examples
pythontensorflowkerascallbackpetastorm

How to print out data that goes to keras model.fit , specifically if using petastorm dataset


Update

While I appreciated AloneTogether's answer, I didn't like that I was using take() and it was separate from model.fit.

I put another answer here if you want to look at it. It involves subclassing Model. It's not too bad.

End of Update

I have a simple example, a parquet file with 8 columns named feature_# populated with 1 to 100 for each column

    feature_1      feature_2     ...      feature_8
    1              1                      1
    2              2                      2
    ...            ...                    ...
    99             99                     99
    100            100                    100

my model:

all_cols = ["feature_1","feature_2","feature_3","feature_4","feature_5","feature_6","feature_7","feature_8"]
x_cols = ["feature_1","feature_2","feature_3","feature_4","feature_5","feature_6","feature_7"]


inputs = [Input(shape=(1,),name=col) for col in x_cols]
merged = Concatenate(axis=1)(inputs)
x = Dense(50, activation="relu")(merged)
x = Dense(20,activation="relu")(x)
outputs = Dense(101,activation="softmax")(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
opt = tf.keras.optimizers.Adam(learning_rate=.001)

model.compile(loss="sparse_categorical_crossentropy",\
                      optimizer=opt,metrics=['accuracy'])

I use petastorm like so:

batch_size = 4

with make_batch_reader('%s/df_100.parquet' % data_dir, num_epochs=1,
                                   schema_fields=all_cols) as train_reader:
    with make_batch_reader('%s/df_100.parquet' % data_dir, num_epochs=1,
                                       schema_fields=all_cols) as val_reader:
train_ds = make_petastorm_dataset(train_reader) \
                        .unbatch() \
                        .map( 
                        lambda x: (tuple(getattr(x, col) for col in x_cols),getattr(x,"feature_8"))
                        ) \
                        .batch(batch_size) 
                                            

        val_ds = make_petastorm_dataset(val_reader) \
                        .unbatch() \
                        .map(
                        lambda x: (tuple(getattr(x, col) for col in x_cols), 
                                    getattr(x,"feature_8"))
                        ) \
                        .batch(batch_size) 

For this simple example I use the same data for train as validation. I want to confirm that the whole dataset is going to the model.fit() So I write a Custom Callback

class MyCustomCallback(tf.keras.callbacks.Callback):
  def __init__(self, train_data):
    self.mylist = []
    self.train_data = train_data

  def on_train_batch_begin(self, batch, logs=None):
    print(list(self.train_data.take(1).as_numpy_iterator())[0][0][0])

# and I pass the dataset to the custom callback:
callbacks.append(MyCustomCallback(train_ds))

doesn't print all the values...1 to 100. If I iterate over the dataset (simple for loop) without a model.fit then I do get all 1 to 100, so I think the take() is competing with the model.fit, just a theory.

I have also tried:

class MyCustomCallback(tf.keras.callbacks.Callback):

  def on_train_batch_begin(self, batch, logs=None):
    print(self.model.layers[0].input) # or .output
    #or
    #print(self.model.layers[0].get_weights())


But this doesn't get me any real values and get_weights() prints out empty arrays

this is what printing input prints out:

KerasTensor(type_spec=TensorSpec(shape=(None, 1), dtype=tf.float32, name='feature_1'), name='feature_1', description="created by layer 'feature_1'")

I have tried using K.eval() on the input and output of the layer as well and that ends up with a numpy problem that is not fixed by any of the eager settings.

I really don't think this should be so hard. I just want to peak at the dataset before it goes into training.

I have fooled around with repeat(), cache(), and simply iterating over the dataset before the model.fit but I don't like the idea that this happens before the model.fit and that unless it is cached it reshuffles it, etc...

But I also want to be able to arbitrarily look at the model, any value, any weight, at any time. I don't feel like I can access this stuff, but feel like I should be able to.

Any help is appreciated.

oh, and using tensorflow 2.6.2 atm with tf.keras


Solution

  • I think it all depends on the size of your batch_size because take(1) takes one batch and if the batch_size is < 100 you will not see all the values. If, for example, you have batch_size=100, then you will definitely see the values 1 to 100:

    import pandas as pd
    import tensorflow as tf
    import numpy as np
    from petastorm.tf_utils import make_petastorm_dataset
    from petastorm.reader import make_batch_reader
    
    df = pd.DataFrame({'feature1':np.arange(1, 101), 
                  'feature2':np.arange(1, 101),
                  'feature3':np.arange(1, 101),
                  'feature4':np.arange(1, 101),
                  'feature5':np.arange(1, 101),
                  'feature6':np.arange(1, 101),
                  'feature7':np.arange(1, 101),
                  'feature8':np.arange(1, 101)})
    columns = list(df)
    df.to_parquet('file.parquet')
    x_cols = columns[:-1]
    batch_size = 100
    
    class MyCustomCallback(tf.keras.callbacks.Callback):
      def __init__(self, train_data):
        self.mylist = []
        self.train_data = train_data
    
      def on_train_batch_begin(self, batch, logs=None):
        tf.print(list(self.train_data.take(1).as_numpy_iterator())[0][0][0])
    
    
    with make_batch_reader('file:///content/file.parquet', num_epochs=1,
                                       schema_fields=columns) as train_reader:
      train_ds = make_petastorm_dataset(train_reader) \
                            .unbatch() \
                            .map( 
                            lambda x: (tuple(getattr(x, col) for col in x_cols),getattr(x,"feature8"))
                            ) \
                            .shuffle(buffer_size=1000).batch(batch_size)
                            
      inputs = [tf.keras.layers.Input(shape=(1,),name=col) for col in x_cols]
      merged = tf.keras.layers.Concatenate(axis=1)(inputs)
      x = tf.keras.layers.Dense(50, activation="relu")(merged)
      x = tf.keras.layers.Dense(20,activation="relu")(x)
      outputs = tf.keras.layers.Dense(101, activation="softmax")(x)
      model = tf.keras.Model(inputs=inputs, outputs=outputs)
      opt = tf.keras.optimizers.Adam(learning_rate=.001)
    
      model.compile(loss="sparse_categorical_crossentropy", optimizer=opt,metrics=['accuracy'])
      model.fit(train_ds, epochs=2, callbacks=[MyCustomCallback(train_ds)])
    
    Epoch 1/2
    array([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
            14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,
            27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,
            40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,  52,
            53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,  65,
            66,  67,  68,  69,  70,  71,  72,  73,  74,  75,  76,  77,  78,
            79,  80,  81,  82,  83,  84,  85,  86,  87,  88,  89,  90,  91,
            92,  93,  94,  95,  96,  97,  98,  99, 100])
          1/Unknown - 1s 777ms/step - loss: 19.3339 - accuracy: 0.0100array([  1,   2,   3,   4,   5,   6,   7,   8,   9,  10,  11,  12,  13,
            14,  15,  16,  17,  18,  19,  20,  21,  22,  23,  24,  25,  26,
            27,  28,  29,  30,  31,  32,  33,  34,  35,  36,  37,  38,  39,
            40,  41,  42,  43,  44,  45,  46,  47,  48,  49,  50,  51,  52,
            53,  54,  55,  56,  57,  58,  59,  60,  61,  62,  63,  64,  65,
            66,  67,  68,  69,  70,  71,  72,  73,  74,  75,  76,  77,  78,
            79,  80,  81,  82,  83,  84,  85,  86,  87,  88,  89,  90,  91,
            92,  93,  94,  95,  96,  97,  98,  99, 100])
    1/1 [==============================] - 1s 899ms/step - loss: 19.3339 - accuracy: 0.0100
    ...
    

    Also, I am not sure what exactly the benefits of petastorm are, but if you are looking for an alternative, you could try tensorflow-io:

    import pandas as pd
    import tensorflow_io as tfio
    import tensorflow as tf
    import numpy as np
    
    df = pd.DataFrame({'feature1':np.arange(1, 101), 
                  'feature2':np.arange(1, 101),
                  'feature3':np.arange(1, 101),
                  'feature4':np.arange(1, 101),
                  'feature5':np.arange(1, 101),
                  'feature6':np.arange(1, 101),
                  'feature7':np.arange(1, 101),
                  'feature8':np.arange(1, 101)})
    columns = list(df)
    df.to_parquet('file.parquet')
    ds = tfio.IODataset.from_parquet('file.parquet', columns = columns)
    x_cols = columns[:-1]
    batch_size = 100
    
    train_ds = ds.map(lambda x: (tuple(x[col] for col in x_cols),x["feature8"])).shuffle(buffer_size=1000).batch(batch_size)
    inputs = [tf.keras.layers.Input(shape=(1,),name=col) for col in x_cols]
    merged = tf.keras.layers.Concatenate(axis=1)(inputs)
    x = tf.keras.layers.Dense(50, activation="relu")(merged)
    x = tf.keras.layers.Dense(20,activation="relu")(x)
    outputs = tf.keras.layers.Dense(101, activation="softmax")(x)
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    opt = tf.keras.optimizers.Adam(learning_rate=.001)
    
    model.compile(loss="sparse_categorical_crossentropy", optimizer=opt,metrics=['accuracy'])
    model.fit(train_ds, epochs=2, callbacks=[MyCustomCallback(train_ds)])
    

    Update 1: You can add each batch to an array in the Callback and at the end of each epoch, you could print the values and reset the array for the next epoch:

    class MyCustomCallback(tf.keras.callbacks.Callback):
      def __init__(self, train_data):
        self.mylist = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True, infer_shape=True)
        self.train_data = train_data
    
      def on_batch_end(self, batch, logs=None):
        self.mylist = self.mylist.write(self.mylist.size(), list(self.train_data.take(1).as_numpy_iterator())[0][0][0])
      
      def on_epoch_end(self, epoch, logs=None):
        arr = self.mylist.stack()
        tf.print(arr, summarize=-1)
        self.mylist = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True, infer_shape=True)