Search code examples
pythontensorflowkerastensorflow2.0tensorflow-datasets

The speed of training is reduced using a custom method in tensorflow.keras.layers


I'm using tensorflow.data and custom layers to solve the bottleneck of data augmentation, but I found that using tensorflow.data alone is faster than mixing, I don't know what's going on in the custom layers, can someone please tell me?

Thanks in advance!

This is my data augmentation code, mainly to do standardization and resize.

def random_normalization(data, mean, std):
    mean = tf.multiply(mean, tf.random.uniform(shape=(), minval=0.5,maxval=0.9, dtype=tf.float64))
    std = tf.multiply(std, tf.random.uniform(shape=(), minval=0.5,maxval=0.9, dtype=tf.float64))
    return tf.divide((tf.subtract(data, mean)), std)

def random_resize(data):
    def resizing(index, data, choice, enable, new_data, number, overlap):        
        FrontEnd = tf.cond(tf.math.greater_equal(tf.subtract(index, overlap), tf.constant(0)),
                           lambda: tf.subtract(index, overlap),
                           lambda: index)
        
        BackEnd = tf.cond(tf.math.less(tf.add(tf.add(index, 10),overlap),tf.constant(2000)),
                          lambda: tf.add(tf.add(index, 10),overlap),
                          lambda: index)
        
        z1 = tf.gather(data, indices=[0], axis=1)
        z1 = tf.gather(z1, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        z2 = tf.gather(data, indices=[1], axis=1)
        z2 = tf.gather(z2, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        z3 = tf.gather(data, indices=[2], axis=1)
        z3 = tf.gather(z3, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        z4 = tf.gather(data, indices=[3], axis=1)
        z4 = tf.gather(z4, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        z5 = tf.gather(data, indices=[4], axis=1)
        z5 = tf.gather(z5, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        z6 = tf.gather(data, indices=[5], axis=1)
        z6 = tf.gather(z6, indices=tf.range(FrontEnd, BackEnd), axis=0)
        
        
        new_data = tf.tensor_scatter_nd_update(new_data, [[number, 0], [number, 1], [number, 2],
                                                          [number, 3], [number, 4], [number, 5]], 
                                               [tf.math.reduce_mean(z1), tf.math.reduce_mean(z2),
                                                tf.math.reduce_mean(z3), tf.math.reduce_mean(z4),
                                                tf.math.reduce_mean(z5), tf.math.reduce_mean(z6)])
        
        
        return tf.add(index, 10), data, choice, enable, new_data, tf.add(number, 1), overlap
    
    choice = tf.random.uniform(shape=(), minval=0,maxval=4,dtype=tf.int32)
    enable = tf.random.uniform(shape=(), minval=0,maxval=1,dtype=tf.float64)
    overlap = tf.random.uniform(shape=(), minval=5,maxval=21,dtype=tf.int32)
    
    new_data = tf.zeros((200,6), dtype=tf.float64)
    index = tf.constant(0)
    number = tf.constant(0)
    condition = lambda index, data, choice, enable, new_data, number, overlap: tf.less(index, 2000)
    r = tf.while_loop(condition, resizing, loop_vars=(index, data, choice, enable, new_data, number, overlap))
    return r[4]

def normal_resize(data):
    data = tf.reshape(data, (2000,6,1))
    data = tf.image.resize(data, size=[200,6])
    return tf.cast(tf.reshape(data, (200,6)),dtype=tf.float64)

def augmentation(data, labels):
    mean = tf.math.reduce_mean(data,axis=0)
    std = tf.math.reduce_std(data,axis=0)
    data = tf.cond(tf.random.uniform(shape=(), minval=0, maxval=1,dtype=tf.float64) < tf.constant(0.8,dtype=tf.float64), 
                   lambda: random_normalization(data, mean, std), 
                   lambda: tf.divide((tf.subtract(data, mean)), std))
    
    # 2000 resize to 200
    data = tf.cond(tf.random.uniform(shape=(), minval=0, maxval=1,dtype=tf.float64) < tf.constant(0.8,dtype=tf.float64), 
                   lambda: random_resize(data), 
                   lambda: normal_resize(data))

    return data, labels

Main code, including tf.data and model

if __name__ == '__main__':
    trainDS = tf.data.Dataset.from_tensor_slices((np.random.rand(3000,2000,6),
                                                  np.concatenate((np.zeros((1500)),np.ones((1500))))))
    trainDS = (
        trainDS
        .cache()
        .shuffle(1000, reshuffle_each_iteration=False)
        .map(augmentation, num_parallel_calls=tf.data.AUTOTUNE)
        .batch(128, drop_remainder=True)
        .prefetch(tf.data.AUTOTUNE))
    
    input = Input((200,6))
    x = LSTM(64, return_sequences=True)(input)
    output = Dense(1,activation='sigmoid')(x)
    model = Model(input, output)
    model.compile(optimizer='adam', loss='BinaryCrossentropy')
    model.fit(trainDS, epochs=3)

Then this is the code of my custom layer, although it is a bit cumbersome, it still achieves the result I want.

import tensorflow as tf
from tensorflow.keras.layers import LSTM, Dense, Input
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Layer
import numpy as np

class CustomLayer(Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
    
    def execute(self, data, batch_size, new_data, _type):
        def _fun(index, data, _type, new_data):
            resized = tf.cond(_type,
                              lambda:augmentation(tf.reshape(tf.gather(data,[index]), (2000,6))),
                              lambda:normal_resize(tf.reshape(tf.gather(data,[index]), (2000,6))))
            values = tf.reshape(resized, (1,-1))[0]
            _Indices = self.createIndices(index)
            new_data = tf.tensor_scatter_nd_update(new_data, _Indices, values)
            return tf.add(index,1), data, _type, new_data
        
        index = tf.constant(0)
        condition = lambda index, data, _type, new_data: tf.less(index, batch_size)
        r = tf.while_loop(condition, _fun, loop_vars=(index, data, _type, new_data))
        return r[-1]
    
    def createIndices(self, BatchSizeIndex):
        def loop1(_i, BatchSizeIndex, col_num, _Indices):
            def loop2(_i, _j, BatchSizeIndex, col_num, _Indices):
                _Indices = tf.tensor_scatter_nd_update(_Indices, [[col_num, 0], [col_num, 1], [col_num, 2]], 
                                                        [BatchSizeIndex, _i, _j])
                return _i, tf.add(_j,1), BatchSizeIndex, tf.add(col_num,1), _Indices
            
            _j = tf.constant(0)
            condition_loop2 = lambda _i, _j, BatchSizeIndex, col_num, _Indices: tf.less(_j, 6)
            r_loop2 = tf.while_loop(condition_loop2, loop2, loop_vars=(_i, _j, BatchSizeIndex, col_num, _Indices))  
            return tf.add(_i,1), BatchSizeIndex, r_loop2[3], r_loop2[4]

        _Indices = tf.zeros((1200,3), dtype=tf.int32)
        col_num = tf.constant(0)
        _i = tf.constant(0)
        condition_loop1 = lambda _i, BatchSizeIndex, col_num, _Indices: tf.less(_i, 200)
        r_loop1 = tf.while_loop(condition_loop1, loop1, loop_vars=(_i, BatchSizeIndex, col_num, _Indices))
        return r_loop1[-1]
    
    def call(self, images, training):
        batch_size = tf.shape(images)[0]
        new_data = tf.zeros((batch_size, 200, 6), dtype=tf.float64)
        images = tf.cast(images, dtype=tf.float64)
        if training:
            data = self.execute(images, batch_size, new_data, tf.constant(True))
        else:
            data = self.execute(images, batch_size, new_data, tf.constant(False))
        
        return data

The final code can be modified to execute like this.

def augmentation(data):
    .....
    return data

if __name__ == '__main__':
    trainDS = tf.data.Dataset.from_tensor_slices((np.random.rand(3000,2000,6),
                                                  np.concatenate((np.zeros((1500)),np.ones((1500))))))
    trainDS = (
        trainDS
        .cache()
        .shuffle(1000, reshuffle_each_iteration=False)
        .batch(128, drop_remainder=True)
        .prefetch(tf.data.AUTOTUNE))
    
    input = Input((2000,6))
    x = CustomLayer()(input)
    x = LSTM(64, return_sequences=True)(x)
    output = Dense(1,activation='sigmoid')(x)
    model = Model(input, output)
    model.compile(optimizer='adam', loss='BinaryCrossentropy')
    model.fit(trainDS, epochs=3)

Results: Alone tf.data spend about 18s, tf.data+CustomLayer spend about 38s.

The thing I want to clarify is that the use of map in tf.data to run augmentation is on the CPU, but if I write augmentation in the Layer, it should theoretically run on the GPU. Why is there such a big gap between the two?

Environment: python3.6, tensorflow2.4.0


Solution

  • I reduced the tf.gather method in the code, and used tf.TensorArray instead of the tf.tensor_scatter_nd_update method, which really optimized my training speed, from 2min to 2s.